flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Luke Hutchison (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-6114) Type checking fails with generics, even when concrete type of field is not needed
Date Wed, 22 Mar 2017 00:46:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15935576#comment-15935576
] 

Luke Hutchison edited comment on FLINK-6114 at 3/22/17 12:45 AM:
-----------------------------------------------------------------

[~greghogan] I partially reconstructed exactly what I was doing before the exception was triggered
that I reported originally. I was wrong about not joining using the generic type, I was actually
using it as the join key. This works for smaller tests that I try, but for my more extensive
example shown below, I cannot get it to work.

Given the following code:

{code}
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;

public class MainTest {

    public static <K> DataSet<Tuple2<K, Float>> convertToFractionalRank(DataSet<Tuple2<K,
Float>> key_score) {
        // Sum within each key
        // Result: ("", key, totScore)
        DataSet<Tuple3<String, K, Float>> blank_key_totScore = 
                key_score 
                        .groupBy(0).sum(1) 
                        // Prepend with "" to prep for for join
                        .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = */
t.f1)) 
                        // .returns(TypeInformation.of(new TypeHint<Tuple3<String, K,
Float>>(){}))  // (2)
                        ;

        // Count unique keys. Result: ("", numKeys)
        DataSet<Tuple2<String, Integer>> blank_numKeys = 
                blank_key_totScore 
                        .distinct(1)                                                     
           // (1)
                        .map(t -> new Tuple2<String, Integer>("", 1)) 
                        .groupBy(0).sum(1);

        // Sort scores into order, then return the fractional rank in the range [0, 1]
        return blank_key_totScore 
                .coGroup(blank_numKeys) 
                .where(0).equalTo(0) 
                .with((Iterable<Tuple3<String, K, Float>> ai, Iterable<Tuple2<String,
Integer>> bi,
                        Collector<Tuple4<String, K, Float, Integer>> out) ->
{
                    int numKeys = bi.iterator().next().f1;
                    for (Tuple3<String, K, Float> a : ai) {
                        out.collect(new Tuple4<>("", /* key = */ a.f1, /* totScore =
*/ a.f2, numKeys));
                    }
                }) 
                // Group by "" (i.e. make into one group, so all the scores can be sorted
together)
                .groupBy(0) 
                // Sort in descending order of score (the highest score gets the lowest rank,
and vice versa)
                .sortGroup(2, Order.DESCENDING) 
                // Convert sorted rank from [0, numKeys-1] -> [0, 1]
                .reduceGroup(
                        (Iterable<Tuple4<String, K, Float, Integer>> iter, Collector<Tuple2<K,
Float>> out) -> {
                            int rank = 0;
                            for (Tuple4<String, K, Float, Integer> t : iter) {
                                int numKeys = t.f3; // Same for all tuples
                                float fracRank = rank / (float) (numKeys - 1);
                                out.collect(new Tuple2<>(/* key = */ t.f1, fracRank));
                                rank++;
                            }
                        })
                .name("convert problem severity scores into building scores");
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Tuple2<String, Integer>, Float>> ds = env.fromElements(
                new Tuple2<>(new Tuple2<>("x", 1), 1.0f), new Tuple2<>(new
Tuple2<>("x", 2), 1.0f),
                new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new
Tuple2<>("x", 3), 1.0f),
                new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new
Tuple2<>("y", 1), 1.0f),
                new Tuple2<>(new Tuple2<>("y", 2), 1.0f), new Tuple2<>(new
Tuple2<>("y", 3), 1.0f));
        DataSet<Tuple2<Tuple2<String, Integer>, Float>> ds2 = convertToFractionalRank(ds);
        System.out.println(ds2.collect());
    }
}
{code}

This exception is thrown at the line marked {{// (1)}}:

{noformat}
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The
return type of function 'convertToFractionalRank(MainTest.java:21)' could not be determined
automatically, due to type erasure. You can give type information hints by using the returns(...)
method on the result of the transformation call, or by letting your function implement the
'ResultTypeQueryable' interface.
	at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
	at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607)
	at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28)
	at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown
Error. Type is null.
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
	at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
	at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21)
	... 1 more
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown Error. Type
is null.
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
	... 6 more
{noformat}

If you uncomment line {{// (2)}}, you instead get:

{noformat}
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type
of TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(org.apache.flink.api.java.DataSet)'
could not be determined. This is most likely a type erasure problem. The type extraction currently
supports types with generic variables only in cases where all variables in the return type
can be deduced from the input type(s).
	at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989)
	at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679)
	at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:629)
	at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:595)
	at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:588)
	at org.apache.flink.api.common.typeinfo.TypeHint.<init>(TypeHint.java:47)
	at com.rentlogic.buildingscores.flink.MainTest$1.<init>(MainTest.java:24)
	at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:24)
	at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:71)
{noformat}

I was then trying some other options to get the type K to work with the Flink operators. This
is where I'm stuck, I can't remember exactly what I tweaked to trigger the exception. But
the code above is a much more complete representation of what I was trying to do.

What's the right way to get generics like this working with Flink? Why does the above not
work currently? (Everything I try throws some sort of exception..)


was (Author: lukehutch):
[~greghogan] I partially reconstructed exactly what I was doing before the exception was triggered
that I reported originally. I was wrong about not joining using the generic type, I was actually
using it as the join key. This works for smaller tests that I try, but for my more extensive
example shown below, I cannot get it to work.

Given the following code:

{code}
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;

public class MainTest {

    public static <K> DataSet<Tuple2<K, Float>> convertToFractionalRank(DataSet<Tuple2<K,
Float>> key_score) {
        // Sum within each key
        // Result: ("", key, totScore)
        DataSet<Tuple3<String, K, Float>> blank_key_totScore = 
                key_score 
                        .groupBy(0).sum(1) 
                        // Prepend with "" to prep for for join
                        .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = */
t.f1)) 
                        // .returns(TypeInformation.of(new TypeHint<Tuple3<String, K,
Float>>(){}))  // (2)
                        ;

        // Count unique keys. Result: ("", numKeys)
        DataSet<Tuple2<String, Integer>> blank_numKeys = 
                blank_key_totScore 
                        .distinct(0)                                                     
           // (1)
                        .map(t -> new Tuple2<String, Integer>("", 1)) 
                        .groupBy(0).sum(1);

        // Sort scores into order, then return the fractional rank in the range [0, 1]
        return blank_key_totScore 
                .coGroup(blank_numKeys) 
                .where(0).equalTo(0) 
                .with((Iterable<Tuple3<String, K, Float>> ai, Iterable<Tuple2<String,
Integer>> bi,
                        Collector<Tuple4<String, K, Float, Integer>> out) ->
{
                    int numKeys = bi.iterator().next().f1;
                    for (Tuple3<String, K, Float> a : ai) {
                        out.collect(new Tuple4<>("", /* key = */ a.f1, /* totScore =
*/ a.f2, numKeys));
                    }
                }) 
                // Group by "" (i.e. make into one group, so all the scores can be sorted
together)
                .groupBy(0) 
                // Sort in descending order of score (the highest score gets the lowest rank,
and vice versa)
                .sortGroup(2, Order.DESCENDING) 
                // Convert sorted rank from [0, numKeys-1] -> [0, 1]
                .reduceGroup(
                        (Iterable<Tuple4<String, K, Float, Integer>> iter, Collector<Tuple2<K,
Float>> out) -> {
                            int rank = 0;
                            for (Tuple4<String, K, Float, Integer> t : iter) {
                                int numKeys = t.f3; // Same for all tuples
                                float fracRank = rank / (float) (numKeys - 1);
                                out.collect(new Tuple2<>(/* key = */ t.f1, fracRank));
                                rank++;
                            }
                        })
                .name("convert problem severity scores into building scores");
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Tuple2<String, Integer>, Float>> ds = env.fromElements(
                new Tuple2<>(new Tuple2<>("x", 1), 1.0f), new Tuple2<>(new
Tuple2<>("x", 2), 1.0f),
                new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new
Tuple2<>("x", 3), 1.0f),
                new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new
Tuple2<>("y", 1), 1.0f),
                new Tuple2<>(new Tuple2<>("y", 2), 1.0f), new Tuple2<>(new
Tuple2<>("y", 3), 1.0f));
        DataSet<Tuple2<Tuple2<String, Integer>, Float>> ds2 = convertToFractionalRank(ds);
        System.out.println(ds2.collect());
    }
}
{code}

This exception is thrown at the line marked {{// (1)}}:

{noformat}
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The
return type of function 'convertToFractionalRank(MainTest.java:21)' could not be determined
automatically, due to type erasure. You can give type information hints by using the returns(...)
method on the result of the transformation call, or by letting your function implement the
'ResultTypeQueryable' interface.
	at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
	at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607)
	at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28)
	at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown
Error. Type is null.
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
	at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
	at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21)
	... 1 more
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown Error. Type
is null.
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
	... 6 more
{noformat}

If you uncomment line {{// (2)}}, you instead get:

{noformat}
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type
of TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(org.apache.flink.api.java.DataSet)'
could not be determined. This is most likely a type erasure problem. The type extraction currently
supports types with generic variables only in cases where all variables in the return type
can be deduced from the input type(s).
	at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989)
	at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679)
	at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:629)
	at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:595)
	at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:588)
	at org.apache.flink.api.common.typeinfo.TypeHint.<init>(TypeHint.java:47)
	at com.rentlogic.buildingscores.flink.MainTest$1.<init>(MainTest.java:24)
	at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:24)
	at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:71)
{noformat}

I was then trying some other options to get the type K to work with the Flink operators. This
is where I'm stuck, I can't remember exactly what I tweaked to trigger the exception. But
the code above is a much more complete representation of what I was trying to do.

What's the right way to get generics like this working with Flink? Why does the above not
work currently? (Everything I try throws some sort of exception..)

> Type checking fails with generics, even when concrete type of field is not needed
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-6114
>                 URL: https://issues.apache.org/jira/browse/FLINK-6114
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
>
> The Flink type checker does not allow generic types to be used in any field of a tuple
when a join is being executed, even if the generic is not in a field that is involved in the
join.
> I have a type Tuple3<String, K, Float>, which contains a generic type parameter
K. I am joining using .where(0).equalTo(0). The type of field 0 is well-defined as String.
However, this gives me the following error:
> {noformat}
> Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet mypkg.MyClass.method(params)'
could not be determined. This is most likely a type erasure problem. The type extraction currently
supports types with generic variables only in cases where all variables in the return type
can be deduced from the input type(s).
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989)
> {noformat}
> The code compiles fine, however -- the static type system is able to correctly resolve
the types in the surrounding code.
> Really only the fields that are affected by joins (or groupBy, aggregation etc.) should
be checked for concrete types in this way.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message