flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lea Helmers <lea.helm...@gmail.com>
Subject Fwd: Problem applying a groupReduce function to a grouped data set
Date Sun, 01 Nov 2015 12:52:47 GMT
Hi!

When I try to apply a groupReduce function to a data set I get an error.

The data set is created like this:

DataSet<Tuple3<String, String, String>> actorsTemp =

env.readCsvFile("/home/lea/Documents/impro3_ws15/actors.tsv")
                        .fieldDelimiter("\t")
                        .includeFields("1110")
                        .types(String.class, String.class, String.class);

        DataSet<Tuple3<String, String, String>> actresses =

env.readCsvFile("/home/lea/Documents/impro3_ws15/actresses.tsv")
                        .fieldDelimiter("\t")
                        .includeFields("1110")
                        .types(String.class, String.class, String.class);

        DataSet<Tuple3<Float, Float, String>> ratings =

env.readCsvFile("/home/lea/Documents/impro3_ws15/ratings.tsv")
                        .fieldDelimiter("\t")
                        .includeFields("0111")
                        .types(Float.class, Float.class, String.class)
                        .filter(new NumberVotesFilter());

        //merge actors and actresses
        DataSet<Tuple3<String, String, String>> actors =
actorsTemp.union(actresses);
        //create weighted rating
        DataSet<Tuple2<String, Float>> weightedRatings =
                    ratings.map(new WeightedRatingCalculator());

THIS IS WHAT I'M TRYING IN THE MAIN METHOD:

                    actors.map(new JoinNames())
                          .join(weightedRatings)
                          .where(1).equalTo(0)
                          .projectFirst(0).projectSecond(1)
                          .groupBy(0)
                          .reduceGroup(new MeanRatingCalculator())
                          .first(10).print();


And here is the GroupReduce function I wrote:

public static class MeanRatingCalculator implements
GroupReduceFunction<Tuple2<String, Float>, Tuple3<String, Float, Integer>>
{

        public void reduce(Iterable<Tuple2<String, Float>> ratedActors,
Collector<Tuple3<String, Float, Integer>> out) throws Exception {

            String name = null;
            Float ratings = 0F;
            int numberOfMovies = 0;
            for (Tuple2<String, Float> a : ratedActors) {
                //store the name
                name = a.f0;
                //update the sum of the ratings and number of movies
                ratings += a.f1;
                numberOfMovies++;
            }
            // emit name, average rating and number of films
            out.collect(new Tuple3<String, Float, Integer>(name,
ratings/(float)numberOfMovies, numberOfMovies));
        }
    }


I get the following error message when I try to compile the code:

java: method reduceGroup in class
org.apache.flink.api.java.operators.UnsortedGrouping<T> cannot be applied
to given types;
  required:
org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R>
  found: de.tub.dima.TopActors.MeanRatingCalculator
  reason: no instance(s) of type variable(s) R exist so that argument type
de.tub.dima.TopActors.MeanRatingCalculator conforms to formal parameter
type
org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R>


I can't figure out what the problem might be and would be very grateful for
any help!! I hope I have given all the necessary information. I'm using
Ubuntu 14.04 and IntelliJ Idea as IDE.

Thank you very much,
Lea

Mime
View raw message