flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Frank Dekervel <ker...@gmail.com>
Subject Re: scala version of flink mongodb example
Date Thu, 08 Sep 2016 18:54:42 GMT
Hello Fabian,

Thanks, your solution works indeed. however, i don't understand why.
When i replace the lambda by an explicit function

def mapfunc2(pair: Tuple2[BSONWritable, BSONWritable]) : String = {
    return pair._1.toString
}
input.map mapfunc2


i get the error below, which seemingly indicates that my method call maps
both to the scala version (first overloaded method) and the java version
(which works with a MapFunction, second one in the error message)

this was also the error i got when doing the following (which looks the
most logical to me)

def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
    return t1.toString()
}
input.map mapfunc

it would seem logical to me to decompose the pair as 2 separate arguments
(which is what the java version of the example also does at
https://github.com/okkam-it/flink-mongodb-test)

and this is the error message:

both method map in class DataSet of type [R](fun:
((com.mongodb.hadoop.io.BSONWritable, com.mongodb.hadoop.io.BSONWritable))
=> R)(implicit evidence$4:
org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit
evidence$5: scala.reflect.ClassTag[R])org.apache.flink.api.scala.DataSet[R]
and method map in class DataSet of type [R](mapper:
org.apache.flink.api.common.functions.MapFunction[(com.mongodb.hadoop.io.BSONWritable,
com.mongodb.hadoop.io.BSONWritable),R])(implicit evidence$2:
org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit
evidence$3: scala.reflect.ClassTag[R])org.apache.flink.api.scala.DataSet[R]
match expected type ?

Thanks!
Frank


On Thu, Sep 8, 2016 at 6:56 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi Frank,
>
> input should be of DataSet[(BSONWritable, BSONWritable)], so a
> Tuple2[BSONWritable, BSONWritable], right?
>
> Something like this should work:
>
> input.map( pair => pair._1.toString )
>
> Pair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the key
> of the pair.
>
> Alternatively you can also add an import org.apache.flink.api.scala.
> extensions._
>
> and then you can do
>
> input.mapWith { case (x, y) => x }
>
> Best, Fabian
>
>
> 2016-09-08 18:30 GMT+02:00 Frank Dekervel <kervel@gmail.com>:
>
>> Hello,
>>
>> i'm new to flink, and i'm trying to get a mongodb hadoop input format
>> working in scala. However, i get lost in the scala generics system ...
>> could somebody help me ?
>>
>> Code is below, neither version works (compile error at the "map" call),
>> either because of method not applicable either because of ambiguous
>> reference to overloaded method map (flink 1.0.3)
>>
>> Thanks already
>> greetings,
>> Frank
>>
>>
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.api.scala.DataSet;
>> import org.apache.flink.api.scala.ExecutionEnvironment;
>> import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;
>>
>> import org.apache.hadoop.mapred.JobConf;
>> import org.bson.BSONObject;
>>
>> import com.mongodb.BasicDBObject;
>> import com.mongodb.hadoop.io.BSONWritable;
>> import com.mongodb.hadoop.mapred.MongoInputFormat;
>>
>> val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable],
>> classOf[BSONWritable], new JobConf())
>>
>> hdIf.getJobConf().set("mongo.input.uri",
>>     "mongodb://localhost:27017/handling.event");
>>
>> val input = env.createInput(hdIf);
>>
>> def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
>>     return t1.toString()
>> }
>>
>> // does not work
>> //input.map mapfunc
>>
>> // does not work either
>> input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
>> // does not work either
>> //input.map ( (t1, t2) => t1 )
>>
>>
>

Mime
View raw message