flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Left join with unbalanced dataset
Date Wed, 03 Feb 2016 09:44:59 GMT
Hi!

I think the closed channel is actually an effect of the process kill.
Before the exception, you can see "15:22:47,592 ERROR org.apache.flink.yarn.
YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM" in the log, which
means that UNIX is killing the process.
I assume that the first thing that happens is that UNIX closes the open
file handles, while the JVM shutdown hooks are still in progress. Hence the
exception.

So, the root cause is still the YARN memory killer.

The log comes from release version 0.10.0.
The Netty fix came into Flink after version 0.10.1 - so it is currently
only in 0.10-SNAPSHOT (and will be in 0.10.2 in a couple of days).

Greetings,
Stephan


On Wed, Feb 3, 2016 at 10:11 AM, LINZ, Arnaud <ALINZ@bouyguestelecom.fr>
wrote:

> Hi,
>
>
>
> I see nothing wrong in the log of the killed container (it’s in fact
> strange that it fails with I/O channel closure before it is killed by
> yarn), but I’ll post new logs with memory debug as a web download within
> the day.
>
>
>
> In the mean time, log extract :
>
>
>
> Container: container_e11_1453202008841_2868_01_000018 on
> h1r1dn06.bpa.bouyguestelecom.fr_45454
>
>
> ================================================================================================
>
>
>
> …
>
> 15:04:01,234 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> --------------------------------------------------------------------------------
>
> 15:04:01,236 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Starting
> YARN TaskManager (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14
> UTC)
>
> 15:04:01,236 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Current
> user: datcrypt
>
> 15:04:01,236 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM: Java
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08
>
> 15:04:01,236 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Maximum
> heap size: 6900 MiBytes
>
> 15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner
>       -  JAVA_HOME: /usr/java/default
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop
> version: 2.6.0
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM
> Options:
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Xms7200m
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Xmx7200m
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -XX:MaxDirectMemorySize=7200m
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Dlogback.configurationFile=file:logback.xml
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Dlog4j.configuration=file:log4j.properties
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Program
> Arguments:
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> --configDir
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -     .
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> --streamingMode
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -     batch
>
> 15:04:01,238 INFO
>  org.apache.flink.yarn.YarnTaskManagerRunner                   -
> --------------------------------------------------------------------------------
>
> …
>
> 15:04:02,215 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Starting
> TaskManager actor
>
> 15:04:02,224 INFO
> org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig
> [server address: bt1shlhr/172.21.125.16, server port: 47002, memory
> segment size (bytes): 32768, transport type: NIO, number of server threads:
> 0 (use Netty's default), number of client threads: 0 (use Netty's default),
> server connect backlog: 0 (use Netty's default), client connect timeout
> (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
>
> 15:04:02,226 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Messages
> between TaskManager and JobManager have a max timeout of 100000 milliseconds
>
> …
>
>
>
> 15:04:02,970 INFO
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
> 1024 MB for network buffer pool (number of memory segments: 32768, bytes
> per segment: 32768).
>
> 15:04:03,527 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7
> of the currently free heap space for Flink managed heap memory (4099 MB).
>
> 15:04:06,250 INFO
> org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
> uses directory
> /data/1/hadoop/yarn/local/usercache/datcrypt/appcache/application_1453202008841_2868/flink-io-5cc3aa50-6723-460e-a722-800dd908e9e8
> for spill files.
>
> …
>
> 15:04:06,429 INFO
> org.apache.flink.yarn.YarnTaskManager                         - TaskManager
> data connection information: h1r1dn06.bpa.bouyguestelecom.fr
> (dataPort=47002)
>
> 15:04:06,430 INFO
> org.apache.flink.yarn.YarnTaskManager                         - TaskManager
> has 2 task slot(s).
>
> 15:04:06,431 INFO
> org.apache.flink.yarn.YarnTaskManager                         - Memory
> usage stats: [HEAP: 5186/6900/6900 MB, NON HEAP: 25/50/130 MB
> (used/committed/max)]
>
> 15:04:06,438 INFO
> org.apache.flink.yarn.YarnTaskManager                         - Trying to
> register at JobManager akka.tcp://
> flink@172.21.125.31:36518/user/jobmanager (attempt 1, timeout: 500
> milliseconds)
>
> 15:04:06,591 INFO
> org.apache.flink.yarn.YarnTaskManager                         - Successful
> registration at JobManager (akka.tcp://
> flink@172.21.125.31:36518/user/jobmanager), starting network stack and
> library cache.
>
> …
>
>
>
> 15:17:22,191 INFO
> org.apache.flink.yarn.YarnTaskManager                         -
> Unregistering task and sending final execution state FINISHED to JobManager
> for task DataSink (Hive Output to
> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE)
> (c9dc588ceb209d98fd08b5144a59adfc)
>
> 15:17:22,196 INFO
> org.apache.flink.runtime.taskmanager.Task                     - DataSink
> (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)
> switched to FINISHED
>
> 15:17:22,197 INFO
> org.apache.flink.runtime.taskmanager.Task                     - Freeing
> task resources for DataSink (Hive Output to
> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)
>
> 15:17:22,197 INFO
> org.apache.flink.yarn.YarnTaskManager                         -
> Unregistering task and sending final execution state FINISHED to JobManager
> for task DataSink (Hive Output to
> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE)
> (0c1c027e2ca5111e3e54c98b6d7265d7)
>
> 15:22:47,592 ERROR
> org.apache.flink.yarn.YarnTaskManagerRunner                   - RECEIVED
> SIGNAL 15: SIGTERM
>
> 15:22:47,608 ERROR
> org.apache.flink.runtime.operators.BatchTask                  - Error in
> task code:  CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (89/95)
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>         ... 8 more
>
> 15:22:47,608 ERROR
> org.apache.flink.runtime.operators.BatchTask                  - Error in
> task code:  CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (87/95)
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>         ... 8 more
>
> 15:22:47,617 INFO
> org.apache.flink.runtime.taskmanager.Task                     - CHAIN
> GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) ->
> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95) switched to
> FAILED with exception.
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>         ... 8 more
>
> 15:22:47,619 INFO
> org.apache.flink.runtime.taskmanager.Task                     - CHAIN
> GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) ->
> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95) switched to
> FAILED with exception.
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>         ... 8 more
>
> 15:22:47,627 INFO
> org.apache.flink.runtime.taskmanager.Task                     - Freeing
> task resources for CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (87/95)
>
> 15:22:47,627 INFO
> org.apache.flink.runtime.taskmanager.Task                     - Freeing
> task resources for CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (89/95)
>
> 15:22:47,664 INFO
> org.apache.flink.yarn.YarnTaskManager                         -
> Unregistering task and sending final execution state FAILED to JobManager
> for task CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (8d6b8b11714a27ac1ca83b39bdee577f)
>
> 15:22:47,738 INFO
> org.apache.flink.yarn.YarnTaskManager                         -
> Unregistering task and sending final execution state FAILED to JobManager
> for task CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (73c6c2f15159dcb134e3899064a30f33)
>
> 15:22:47,841 ERROR
> org.apache.flink.runtime.operators.BatchTask                  - Error in
> task code:  CHAIN GroupReduce (GroupReduce at
> calculeMinArea(TransfoStage2StageOnTaz.java:159)) -> Map (Key Extractor 1)
> (88/95)
>
> com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel
> already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34
>
>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:74)
>
>         at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:756)
>
>         at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>
>         at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>
>         at
> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
>
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>
>         at
> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:86)
>
>         at
> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:151)
>
>         at
> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:85)
>
>         at
> org.apache.flink.runtime.util.ReusingKeyGroupedIterator$ValuesIterator.hasNext(ReusingKeyGroupedIterator.java:186)
>
>         at
> org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator.hasNext(TupleUnwrappingIterator.java:49)
>
>         at
> com.bouygtel.kubera.processor.stage.TransfoStage2StageOnTaz$6.reduce(TransfoStage2StageOnTaz.java:170)
>
>         at
> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:101)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:118)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:75)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:43)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.sendReadRequest(ChannelReaderInputView.java:259)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:224)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedInputView.read(AbstractPagedInputView.java:213)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)
>
>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)
>
>         ... 25 more
>
>
>
>
>
> (...)
>
> ______________________
>
> 15:22:51,798 INFO
> org.apache.flink.yarn.YarnJobManager                          - Container
> container_e11_1453202008841_2868_01_000018 is completed with diagnostics:
> Container
> [pid=14548,containerID=container_e11_1453202008841_2868_01_000018] is
> running beyond physical memory limits. Current usage: 12.1 GB of 12 GB
> physical memory used; 13.0 GB of 25.2 GB virtual memory used. Killing
> container.
>
> Dump of the process-tree for container_e11_1453202008841_2868_01_000018 :
>
>         |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>
>         |- 14548 14542 14548 14548 (bash) 0 0 108646400 310 /bin/bash -c
> /usr/java/default/bin/java -Xms7200m -Xmx7200m
> -XX:MaxDirectMemorySize=7200m
> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
> -Dlogback.configurationFile=file:logback.xml
> -Dlog4j.configuration=file:log4j.properties
> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1>
> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.out
> 2>
> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.err
> --streamingMode batch
>
>         |- 14558 14548 14548 14548 (java) 631070 15142 13881634816 3163462
> /usr/java/default/bin/java -Xms7200m -Xmx7200m
> -XX:MaxDirectMemorySize=7200m
> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
> -Dlogback.configurationFile=file:logback.xml
> -Dlog4j.configuration=file:log4j.properties
> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode
> batch
>
>
>
> Container killed on request. Exit code is 143
>
> Container exited with a non-zero exit code 143
>
>
>
>
>
>
>
>
>
> *De :* ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] *De la part
> de* Stephan Ewen
> *Envoyé :* mardi 2 février 2016 20:20
> *À :* user@flink.apache.org
> *Objet :* Re: Left join with unbalanced dataset
>
>
>
> To make sure this discussion does not go in a wrong direction:
>
>
>
> There is no issue here with data size, or memory management. The
> MemoryManagement for sorting and hashing works, and Flink handles the
> spilling correctly, etc.
>
>
>
> The issue here is different
>
>    - One possible reason is that the network stack (specifically the Netty
> library) allocates too much direct (= off heap) memory for buffering the
> TCP connections.
>
>    - Another reason could be leaky behavior in Hadoop's HDFS code.
>
>
>
>
>
> @Arnaud: We need the full log of the TaskManager that initially
> experiences that failure, then we can look into this. Best would be with
> activated memory logging, like suggested by Ufuk.
>
>
>
> Best,
>
> Stephan
>
>
>
> ------------------------------
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>

Mime
View raw message