flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Luke Hutchison (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-6276) InvalidTypesException: Unknown Error. Type is null.
Date Fri, 07 Apr 2017 06:08:41 GMT
Luke Hutchison created FLINK-6276:

             Summary: InvalidTypesException: Unknown Error. Type is null.
                 Key: FLINK-6276
                 URL: https://issues.apache.org/jira/browse/FLINK-6276
             Project: Flink
          Issue Type: Bug
          Components: Core, DataSet API
    Affects Versions: 1.2.0
            Reporter: Luke Hutchison

Quite frequently when writing Flink code, I get the exception {{InvalidTypesException: Unknown
Error. Type is null.}} 

A small example that triggers it is:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class TestMain {

    public static <K, V> DataSet<Tuple2<K, List<V>>> join(V missingValuePlaceholder,
            DataSet<Tuple2<K, V>>... datasets) {
        DataSet<Tuple2<K, List<V>>> join = null;
        for (int i = 0; i < datasets.length; i++) {
            final int datasetIdx = i;
            if (datasetIdx == 0) {
                join = datasets[datasetIdx] //
                        .map(t -> new Tuple2<>(t.f0, Arrays.asList(t.f1))) //
                        .name("start join");
            } else {
                join = join.coGroup(datasets[datasetIdx]) //
                        .where(0).equalTo(0) //
                        .with((Iterable<Tuple2<K, List<V>>> li, Iterable<Tuple2<K,
V>> ri,
                                Collector<Tuple2<K, List<V>>> out) ->
                            K key = null;
                            List<V> vals = new ArrayList<>(datasetIdx + 1);
                            Iterator<Tuple2<K, List<V>>> lIter = li.iterator();
                            if (!lIter.hasNext()) {
                                for (int j = 0; j < datasetIdx; j++) {
                            } else {
                                Tuple2<K, List<V>> lt = lIter.next();
                                key = lt.f0;
                                if (lIter.hasNext()) {
                                    throw new RuntimeException("Got non-unique key: " + key);
                            Iterator<Tuple2<K, V>> rIter = ri.iterator();
                            if (!rIter.hasNext()) {
                            } else {
                                Tuple2<K, V> rt = rIter.next();
                                key = rt.f0;
                                if (rIter.hasNext()) {
                                    throw new RuntimeException("Got non-unique key: " + key);
                            out.collect(new Tuple2<K, List<V>>(key, vals));
                        }) //
                        .name("join #" + datasetIdx);
        return join;

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Tuple2<String, Integer>> x = //
                env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), new
Tuple2<>("c", 5));
        DataSet<Tuple2<String, Integer>> y = //
                env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), new
Tuple2<>("d", 2));
        DataSet<Tuple2<String, Integer>> z = //
                env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), new
Tuple2<>("e", 9));

        System.out.println(join(-1, x, y, z).collect());

The stacktrace that is triggered is:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The
return type of function 'join(TestMain.java:23)' could not be determined automatically, due
to type erasure. You can give type information hints by using the returns(...) method on the
result of the transformation call, or by letting your function implement the 'ResultTypeQueryable'
	at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
	at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:424)
	at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27)
	at com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:74)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown
Error. Type is null.
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
	at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
	at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:23)
	... 1 more
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown Error. Type
is null.
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
	... 6 more

The code compiles fine, and typechecks. Maybe something is wrong with the code; but either
way, Flink should report a better error message.

A separate issue here is that the error message is being reported for the wrong function:
the problem is not with the return type of {{ join(TestMain.java:23) }}, it is some internal
type (probably for a lambda or something) within the function. (It is the {{where}} clause
that throws the exception.)

This message was sent by Atlassian JIRA

View raw message