flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pa Rö <paul.roewer1...@googlemail.com>
Subject Re: Channel received an event before completing the current partial record
Date Wed, 13 May 2015 11:16:41 GMT
my function code:
private static DataSet<GeoTimeDataTupel>
getPointDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String inputFile = pro.getProperty("input");
        // map csv file
        return env.readCsvFile(inputFile)
            .ignoreInvalidLines()
            .fieldDelimiter('\u0009')
            .lineDelimiter("\n")
            .includeFields(true, true, false, false, false, false, false,
false, false, false, false
                    , false, false, false, false, false, false, false,
false, false, false
                    , false, false, false, false, false, false, false,
false, false, false
                    , false, false, false, false, false, false, false,
false, true, true
                    , false, false, false, false, false, false, false,
false, false, false
                    , false, false, false, false, false, false, false)
            .types(String.class, Long.class, Double.class, Double.class)
            .map(new TuplePointConverter());
    }

and i use the GDET data from here:

http://data.gdeltproject.org/events/index.html

2015-05-13 13:09 GMT+02:00 Pa Rö <paul.roewer1990@googlemail.com>:

> hi,
>
> i read a csv file from disk with flink (java, maven version 8.1) and get
> the following exception:
>
> ERROR operators.DataSinkTask: Error in user code: Channel received an
> event before completing the current partial record.:  DataSink(Print to
> System.out) (4/4)
> java.lang.IllegalStateException: Channel received an event before
> completing the current partial record.
>     at
> org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158)
>     at
> org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176)
>     at
> org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51)
>     at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53)
>     at
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170)
>     at
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
>     at java.lang.Thread.run(Thread.java:745)
>
> my code:
>
> public class FlinkMain {
>
>     public static void main(String[] args) {
>         // set up execution environment
>         ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>         //env.setDegreeOfParallelism(1);
>         // get input points
>         DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
>         points.print();
>         // execute program
>         try {
>             env.execute("KMeans Flink");
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>     }
>
> maybe someone have a solution?
>
> best regards paul
>

Mime
View raw message