flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mihail Vieru <vi...@informatik.hu-berlin.de>
Subject Re: "No space left on device" IOException when using Cross operator
Date Tue, 16 Jun 2015 08:09:35 GMT
Hi Stephan,

thank you for your explanation.
I thought I will be getting just 100MB of results after the Cross. This 
is why I used it.
I will try something else then, most possibly a Map on the input.

Best,
Mihail

On 16.06.2015 04:27, Stephan Ewen wrote:
> Cross is a quadratic operation. As such, it produces very large 
> results on moderate inputs, which can easily exceed memory and disk 
> space, if the subsequent operation requires to gather all data (such 
> as for the sort in your case).
>
> If you use on both inputs 10 MB of 100 byte elements (100K elements 
> per input), you end up with 10 billion elements after the cross, which 
> is 1 TB in size (assuming the result elements are also 100 bytes).
>
> This is an inherent issue of using a quadratic operation with data 
> that is to large to be handled by a quadratic operation. Not much 
> anyone can do about this.
>
> Try and see if you can replace the Cross operation by something else 
> (Join, CoGroup) or whether you can at least filter aggressively after 
> the Cross before the next operation.
>
>
> On Mon, Jun 15, 2015 at 2:18 PM, Mihail Vieru 
> <vieru@informatik.hu-berlin.de <mailto:vieru@informatik.hu-berlin.de>> 
> wrote:
>
>     Hi,
>
>     I get the following *"No space left on device" IOException* when
>     using the following Cross operator.
>     The inputs for the operator are each just *10MB* in size (same
>     input for IN1 and IN2; 1000 tuples) and I get the exception after
>     Flink manages to fill *50GB* of SSD space and the partition
>     becomes full.
>
>     I have found a similar problem in the mailing list here:
>     https://mail-archives.apache.org/mod_mbox/flink-user/201412.mbox/%3CCAN0XJzNiTyWDfcDLhsP6iJVhpUgnYn0ACy4ueS2R6YSB68Fr%3DA%40mail.gmail.com%3E
>
>     As I currently don't have any more free file system space left,
>     specifying other temporary folders for Flink is not an option.
>     Any ideas on what could help?
>
>     I'm using the latest 0.9-SNAPSHOT and run the job in a local
>     execution environment.
>
>     Best,
>     Mihail
>
>
>     /java.lang.Exception: The data preparation for task 'GroupReduce
>     (GroupReduce at main(APSPNaiveVernicaJob.java:100))' , caused an
>     error: Error obtaining the sorted input: Thread 'SortMerger
>     spilling thread' terminated due to an exception: No space left on
>     device//
>     //    at
>     org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)//
>     //    at
>     org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)//
>     //    at
>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)//
>     //    at java.lang.Thread.run(Thread.java:745)//
>     //Caused by: java.lang.RuntimeException: Error obtaining the
>     sorted input: Thread 'SortMerger spilling thread' terminated due
>     to an exception: No space left on device//
>     //    at
>     org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)//
>     //    at
>     org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)//
>     //    at
>     org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)//
>     //    at
>     org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)//
>     //    ... 3 more//
>     //Caused by: *java.io.IOException*: Thread 'SortMerger spilling
>     thread' terminated due to an exception: *No space left on device*//
>     //    at
>     org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:785)//
>     //Caused by: java.io.IOException: No space left on device//
>     //    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)//
>     //    at
>     sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)//
>     //    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)//
>     //    at sun.nio.ch.IOUtil.write(IOUtil.java:65)//
>     //    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)//
>     //    at
>     org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:340)//
>     //    at
>     org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:471)
>
>
>
>     //    public static class crossKAPSPFilter implements
>     CrossFunction<Vertex<Integer, Tuple2<Integer[],String>>,
>     Vertex<Integer, Tuple2<Integer[],String>>, //
>     //        Tuple2<Integer,String>>  {//
>     //
>     //            @Override//
>     //            public Tuple2<Integer, String> cross(//
>     //                    Vertex<Integer, Tuple2<Integer[], String>>
>     vertex1,//
>     //                    Vertex<Integer, Tuple2<Integer[], String>>
>     vertex2) throws Exception {//
>     ////
>     //                int vertexIdFirst = vertex1.f0;//
>     //                int vertexIdSecond = vertex2.f0;//
>     //                Integer[] vertexDistanceVectorFirst =
>     vertex1.f1.f0;//
>     //                Integer[] vertexDistanceVectorSecond =
>     vertex2.f1.f0;//
>     ////
>     //                if( //    vertexIdFirst != vertexIdSecond//
>     //                        &&
>     vertexDistanceVectorFirst[vertexIdSecond] <= grapDistThreshold//
>     //                        &&
>     vertexDistanceVectorSecond[vertexIdFirst] <= grapDistThreshold //) {//
>     //                    return new Tuple2<Integer,
>     String>(vertex1.f0, vertex1.f1.f1);//
>     //                }//
>     //                else return null;//
>     //            }//
>     //    }/
>
>


Mime
View raw message