flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hagersaleh <loveallah1...@yahoo.com>
Subject error when eun program left outer join
Date Mon, 27 Apr 2015 12:32:06 GMT
I want implement left outer join from two dataset i use Tuple data type


package org.apache.flink.examples.java.relational;


import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.io.File;

@SuppressWarnings("serial")
public class TPCHQuery3 {

//filed name in cutomer table
 
public static class LeftOuterJoin implements
CoGroupFunction<Tuple2&lt;Tuple1&lt;Integer>, String>,
Tuple2<Tuple1&lt;Integer>, String>,
Tuple2<Tuple1&lt;Integer>,Tuple1<Integer>>> {

    @Override
    public void coGroup(Iterable<Tuple2&lt;Tuple1&lt;Integer>, String>>
leftElements,
                        Iterable<Tuple2&lt;Tuple1&lt;Integer>, String>>
rightElements,
                       
Collector<Tuple2&lt;Tuple1&lt;Integer>,Tuple1<Integer>>> out) throws
Exception {

            

            for (Tuple2<Tuple1&lt;Integer>, String> leftElem : leftElements)
{
                    boolean hadElements = false;
                    for (Tuple2<Tuple1&lt;Integer>, String> rightElem :
rightElements) {
                            out.collect(new
Tuple2<Tuple1&lt;Integer>,Tuple1<Integer>>(leftElem.f0, rightElem.f0));
                            hadElements = true;
                    }
                    if (!hadElements) {
                            out.collect(new Tuple2<Tuple1&lt;Integer>,
Tuple1<Integer>>(leftElem.f0, null));
                    }
            }

    }
  }
	
public static void main(String[] args) throws Exception {
      
      
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple1&lt;Integer>>
leftSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv")
					.fieldDelimiter('|')
                                       
.includeFields("10000000").ignoreFirstLine()
                                        .types(Integer.class);

   // DataSource<Integer> leftSide = env.fromElements(1, 2, 3, 4, 5);
    DataSet<Tuple2&lt;Tuple1&lt;Integer>, String>> leftSide2 = leftSide.map(
        new MapFunction<Tuple1&lt;Integer>, Tuple2<Tuple1&lt;Integer>,
String>>() {
                @Override
                public Tuple2<Tuple1&lt;Integer>, String>
map(Tuple1<Integer> x) throws Exception {
                        return new Tuple2<Tuple1&lt;Integer>, String>(x,
"some data");
                }
        });
DataSet<Tuple1&lt;Integer>>
rightSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/orders.csv")
					.fieldDelimiter('|')
                                       
.includeFields("010000000").ignoreFirstLine()
                                        .types(Integer.class);
   // DataSource<Integer> rightSide = env.fromElements(1,2,4, 5, 6, 7, 8, 9,
10);
    DataSet<Tuple2&lt;Tuple1&lt;Integer>, String>> rightSide2 =
rightSide.map(
        new MapFunction<Tuple1&lt;Integer>, Tuple2<Tuple1&lt;Integer>,
String>>() {
                @Override
                public Tuple2<Tuple1&lt;Integer>, String>
map(Tuple1<Integer> x) throws Exception {
                        return new Tuple2<Tuple1&lt;Integer>, String>(x,
"some other data");
                }
        });
    DataSet<Tuple2&lt;Tuple1&lt;Integer>, Tuple1<Integer>>> leftOuterJoin
=
leftSide2.coGroup(rightSide2)
            .where(0)
            .equalTo(0)
            .with(new LeftOuterJoin());

    leftOuterJoin.writeAsCsv("/home/hadoop/Desktop/Dataset/output1.csv",
"\n", "|");;
    env.execute();

}
    

Error code After run programs
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException:
org.apache.flink.types.NullFieldException: Field 1 is null, but expected to
hold a value.
    at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:97)
    at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
    at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
    at
org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
    at
org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82)
    at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88)
    at
org.apache.flink.examples.java.relational.TPCHQuery3$LeftOuterJoin.coGroup(TPCHQuery3.java:38)
    at
org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:130)
    at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
    at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
    at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
    at java.lang.Thread.run(Thread.java:724)

    at
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
    at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
    at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
    at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:540)
    at
org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:80)



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-eun-program-left-outer-join-tp1141.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message