flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Martin Junghanns <m.jungha...@mailbox.org>
Subject Re: Fwd: Problem applying a groupReduce function to a grouped data set
Date Sun, 01 Nov 2015 19:19:46 GMT
Hi,

just an idea: In the source code documentation, it states that 
projectFirst and projectSecond lose type information, which could 
explain why your group reduce expects <Tuple, R>.

I found an example [1] that calls .types() to define the returned types, 
but this method is deprecated. What I would try is to replace the 
projectFirst and projectSecond with a JoinFunction and output 
Tuple2<String, Float> "manually" like so:

actors.map(new JoinNames())
  .join(weightedRatings)
  .where(1).equalTo(0)
  .with(new JoinFunction<
	TypeLeft,  // output type of JoinNames()
	Tuple2<String, Float>,
	Tuple2<String, Float>> {
	
	@Override
         public Tuple2<String, Float> join(
           TypeLeft left,
           Tuple2<String, Float> right) throws Exception {
           return new Tuple2<>(left.f0, right.f1);
         }
   })
   .withForwardedFieldsFirst("f0")
   .withForwardedFieldsSecond("f1")
   .groupBy(0)
   .reduceGroup(new MeanRatingCalculator())
   .first(10)
   .print();

Hope this helps.

Best,
Martin

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/apis/examples.html#relational-query

On 01.11.2015 13:52, Lea Helmers wrote:
> Hi!
>
> When I try to apply a groupReduce function to a data set I get an error.
>
> The data set is created like this:
>
> DataSet<Tuple3<String, String, String>> actorsTemp =
>
> env.readCsvFile("/home/lea/Documents/impro3_ws15/actors.tsv")
>                          .fieldDelimiter("\t")
>                          .includeFields("1110")
>                          .types(String.class, String.class, String.class);
>
>          DataSet<Tuple3<String, String, String>> actresses =
>
> env.readCsvFile("/home/lea/Documents/impro3_ws15/actresses.tsv")
>                          .fieldDelimiter("\t")
>                          .includeFields("1110")
>                          .types(String.class, String.class, String.class);
>
>          DataSet<Tuple3<Float, Float, String>> ratings =
>
> env.readCsvFile("/home/lea/Documents/impro3_ws15/ratings.tsv")
>                          .fieldDelimiter("\t")
>                          .includeFields("0111")
>                          .types(Float.class, Float.class, String.class)
>                          .filter(new NumberVotesFilter());
>
>          //merge actors and actresses
>          DataSet<Tuple3<String, String, String>> actors =
> actorsTemp.union(actresses);
>          //create weighted rating
>          DataSet<Tuple2<String, Float>> weightedRatings =
>                      ratings.map(new WeightedRatingCalculator());
>
> THIS IS WHAT I'M TRYING IN THE MAIN METHOD:
>
>                      actors.map(new JoinNames())
>                            .join(weightedRatings)
>                            .where(1).equalTo(0)
>                            .projectFirst(0).projectSecond(1)
>                            .groupBy(0)
>                            .reduceGroup(new MeanRatingCalculator())
>                            .first(10).print();
>
>
> And here is the GroupReduce function I wrote:
>
> public static class MeanRatingCalculator implements
> GroupReduceFunction<Tuple2<String, Float>, Tuple3<String, Float, Integer>>
{
>
>          public void reduce(Iterable<Tuple2<String, Float>> ratedActors,
> Collector<Tuple3<String, Float, Integer>> out) throws Exception {
>
>              String name = null;
>              Float ratings = 0F;
>              int numberOfMovies = 0;
>              for (Tuple2<String, Float> a : ratedActors) {
>                  //store the name
>                  name = a.f0;
>                  //update the sum of the ratings and number of movies
>                  ratings += a.f1;
>                  numberOfMovies++;
>              }
>              // emit name, average rating and number of films
>              out.collect(new Tuple3<String, Float, Integer>(name,
> ratings/(float)numberOfMovies, numberOfMovies));
>          }
>      }
>
>
> I get the following error message when I try to compile the code:
>
> java: method reduceGroup in class
> org.apache.flink.api.java.operators.UnsortedGrouping<T> cannot be
> applied to given types;
>    required:
> org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R>
>    found: de.tub.dima.TopActors.MeanRatingCalculator
> reason: no instance(s) of type variable(s) R exist so that argument type
> de.tub.dima.TopActors.MeanRatingCalculator conforms to formal parameter
> type
> org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R>
>
>
> I can't figure out what the problem might be and would be very grateful
> for any help!! I hope I have given all the necessary information. I'm
> using Ubuntu 14.04 and IntelliJ Idea as IDE.
>
> Thank you very much,
>
> Lea

Mime
View raw message