flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kurt Young <ykt...@gmail.com>
Subject Re: GroupReduce is interrupted on reading large CSV files
Date Sat, 01 Apr 2017 07:20:22 GMT
Hi,
Can you share your input file to help us debugging the problem?


Best,
Kurt

On Thu, Mar 30, 2017 at 12:00 AM, Sapei, Ferry Syafei <
FerrySyafei.Sapei@otto.de> wrote:

> Hallo everyone,
>
>
>
> I have a Flink batch job, which reads four CSV files. The rows in the
> files=  will be read and grouped together.
>
>
>
> When the four CSV Files are small enough, the job can finish successfully.
> = However when the input files are large, the job could not successfully
> exec= uted and the following exception as shown below.
>
>
>
> Could somebody please help me to fix this problem?
>
>
>
>
>
> Best regards,
>
> Ferry
>
>
>
>
>
>
>
> 2017-03-29 17:39:19,396 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter
> - Spilling sort buffer without large record handling.
>
> 2017-03-29 17:39:19,458 ERROR org.apache.flink.runtime.
> operators.BatchTask                  - Error in task code:  CHAIN
> GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140))
> -> Map (Key Extractor) (15/16)
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) ->
> Map (Key Extractor)' , caused an error: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: null
>
>         at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:465)
>
>         at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:355)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: null
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
>
>         at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
>
>         at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
>
>         at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:460)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: null
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
>
> Caused by: java.io.EOFException
>
>         at org.apache.flink.runtime.io.disk.RandomAccessInputView.
> nextSegment(RandomAccessInputView.java:79)
>
>         at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(
> AbstractPagedInputView.java:159)
>
>         at org.apache.flink.runtime.memory.AbstractPagedInputView.
> readLong(AbstractPagedInputView.java:357)
>
>         at org.apache.flink.core.memory.HybridMemorySegment.put(
> HybridMemorySegment.java:287)
>
>         at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(
> AbstractPagedOutputView.java:406)
>
>         at org.apache.flink.api.common.typeutils.base.BigIntSerializer.
> copyBigInteger(BigIntSerializer.java:141)
>
>         at org.apache.flink.api.common.typeutils.base.
> BigDecSerializer.copy(BigDecSerializer.java:104)
>
>         at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
> RowSerializer.java:212)
>
>         at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.
> writeToOutput(NormalizedKeySorter.java:499)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
>
> 2017-03-29 17:39:19,759 DEBUG org.apache.flink.runtime.
> operators.BatchTask                  - Releasing all broadcast
> variables.:  CHAIN GroupReduce (GroupReduce at readCsvRows(
> SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (15/16)
>
> 2017-03-29 17:39:19,660 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger
> - Spilling buffer 0.
>
> 2017-03-29 17:39:19,806 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter
> - Spilling sort buffer without large record handling.
>
> 2017-03-29 17:39:19,641 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger
> - Closing of sort/merger was interrupted. The reading/sorting/spilling
> threads may still be working.
>
> java.lang.InterruptedException
>
>         at java.lang.Object.wait(Native Method)
>
>         at java.lang.Thread.join(Thread.java:1249)
>
>         at java.lang.Thread.join(Thread.java:1323)
>
>         at org.apache.flink.runtime.operators.sort.
> UnilateralSortMerger.close(UnilateralSortMerger.java:480)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.
> handleException(UnilateralSortMerger.java:367)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.
> handleException(UnilateralSortMerger.java:362)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.internalHandleException(UnilateralSortMerger.java:842)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
>
> 2017-03-29 17:39:19,641 ERROR org.apache.flink.runtime.
> operators.BatchTask                  - Error in task code:  CHAIN
> GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140))
> -> Map (Key Extractor) (1/16)
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) ->
> Map (Key Extractor)' , caused an error: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>
>         at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:465)
>
>         at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:355)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
>
>         at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
>
>         at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
>
>         at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:460)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: null
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
>
> Caused by: java.io.EOFException
>
>         at org.apache.flink.runtime.util.DataInputDeserializer.readInt(
> DataInputDeserializer.java:179)
>
>         at org.apache.flink.api.common.typeutils.base.BigDecSerializer.
> readBigDecimal(BigDecSerializer.java:125)
>
>         at org.apache.flink.api.common.typeutils.base.
> BigDecSerializer.deserialize(BigDecSerializer.java:99)
>
>         at org.apache.flink.api.common.typeutils.base.
> BigDecSerializer.deserialize(BigDecSerializer.java:31)
>
>         at org.apache.flink.api.java.typeutils.runtime.
> RowSerializer.deserialize(RowSerializer.java:193)
>
>         at org.apache.flink.api.java.typeutils.runtime.
> RowSerializer.deserialize(RowSerializer.java:36)
>
>         at org.apache.flink.runtime.plugable.
> ReusingDeserializationDelegate.read(ReusingDeserializationDelegate
> .java:57)
>
>         at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:144)
>
>         at org.apache.flink.runtime.io.network.api.reader.
> AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>
>         at org.apache.flink.runtime.io.network.api.reader.
> MutableRecordReader.next(MutableRecordReader.java:42)
>
>         at org.apache.flink.runtime.operators.util.ReaderIterator.
> next(ReaderIterator.java:59)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ReadingThread.go(UnilateralSortMerger.java:1035)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
>
> 2017-03-29 17:39:19,460 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger
> - Emitting final buffer from reader thread: 1.
>
>
>

Mime
View raw message