flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: error when eun program left outer join
Date Mon, 27 Apr 2015 12:59:01 GMT
Hi,

what data are you using?

The exception says "NullFieldException: Field 1 is null, but expected to hold
a value.". Maybe the data is not in the right format?

On Mon, Apr 27, 2015 at 2:32 PM, hagersaleh <loveallah1987@yahoo.com> wrote:

> I want implement left outer join from two dataset i use Tuple data type
>
>
> package org.apache.flink.examples.java.relational;
>
>
> import org.apache.flink.api.common.functions.CoGroupFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.tuple.Tuple1;
> import org.apache.flink.api.java.tuple.Tuple4;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.util.Collector;
> import java.io.File;
>
> @SuppressWarnings("serial")
> public class TPCHQuery3 {
>
> //filed name in cutomer table
>
> public static class LeftOuterJoin implements
> CoGroupFunction<Tuple2<Tuple1<Integer>, String>,
> Tuple2<Tuple1<Integer>, String>,
> Tuple2<Tuple1<Integer>,Tuple1<Integer>>> {
>
>     @Override
>     public void coGroup(Iterable<Tuple2<Tuple1<Integer>, String>>
> leftElements,
>                         Iterable<Tuple2<Tuple1<Integer>, String>>
> rightElements,
>
> Collector<Tuple2<Tuple1<Integer>,Tuple1<Integer>>> out) throws
> Exception {
>
>
>
>             for (Tuple2<Tuple1<Integer>, String> leftElem : leftElements)
> {
>                     boolean hadElements = false;
>                     for (Tuple2<Tuple1<Integer>, String> rightElem :
> rightElements) {
>                             out.collect(new
> Tuple2<Tuple1<Integer>,Tuple1<Integer>>(leftElem.f0, rightElem.f0));
>                             hadElements = true;
>                     }
>                     if (!hadElements) {
>                             out.collect(new Tuple2<Tuple1<Integer>,
> Tuple1<Integer>>(leftElem.f0, null));
>                     }
>             }
>
>     }
>   }
>
> public static void main(String[] args) throws Exception {
>
>
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> DataSet<Tuple1<Integer>>
> leftSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv")
>                                         .fieldDelimiter('|')
>
> .includeFields("10000000").ignoreFirstLine()
>                                         .types(Integer.class);
>
>    // DataSource<Integer> leftSide = env.fromElements(1, 2, 3, 4, 5);
>     DataSet<Tuple2<Tuple1<Integer>, String>> leftSide2 = leftSide.map(
>         new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>,
> String>>() {
>                 @Override
>                 public Tuple2<Tuple1<Integer>, String>
> map(Tuple1<Integer> x) throws Exception {
>                         return new Tuple2<Tuple1<Integer>, String>(x,
> "some data");
>                 }
>         });
> DataSet<Tuple1<Integer>>
> rightSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/orders.csv")
>                                         .fieldDelimiter('|')
>
> .includeFields("010000000").ignoreFirstLine()
>                                         .types(Integer.class);
>    // DataSource<Integer> rightSide = env.fromElements(1,2,4, 5, 6, 7, 8,
> 9,
> 10);
>     DataSet<Tuple2<Tuple1<Integer>, String>> rightSide2 =
> rightSide.map(
>         new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>,
> String>>() {
>                 @Override
>                 public Tuple2<Tuple1<Integer>, String>
> map(Tuple1<Integer> x) throws Exception {
>                         return new Tuple2<Tuple1<Integer>, String>(x,
> "some other data");
>                 }
>         });
>     DataSet<Tuple2<Tuple1<Integer>, Tuple1<Integer>>> leftOuterJoin
=
> leftSide2.coGroup(rightSide2)
>             .where(0)
>             .equalTo(0)
>             .with(new LeftOuterJoin());
>
>     leftOuterJoin.writeAsCsv("/home/hadoop/Desktop/Dataset/output1.csv",
> "\n", "|");;
>     env.execute();
>
> }
>
>
> Error code After run programs
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException:
> org.apache.flink.types.NullFieldException: Field 1 is null, but expected to
> hold a value.
>     at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:97)
>     at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>     at
>
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
>     at
>
> org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>     at
>
> org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82)
>     at
>
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88)
>     at
>
> org.apache.flink.examples.java.relational.TPCHQuery3$LeftOuterJoin.coGroup(TPCHQuery3.java:38)
>     at
>
> org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:130)
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>     at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>     at java.lang.Thread.run(Thread.java:724)
>
>     at
>
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
>     at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
>     at
>
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
>     at
>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:540)
>     at
>
> org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:80)
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-eun-program-left-outer-join-tp1141.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Mime
View raw message