flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Frank Dekervel <ker...@gmail.com>
Subject scala version of flink mongodb example
Date Thu, 08 Sep 2016 16:30:50 GMT
Hello,

i'm new to flink, and i'm trying to get a mongodb hadoop input format
working in scala. However, i get lost in the scala generics system ...
could somebody help me ?

Code is below, neither version works (compile error at the "map" call),
either because of method not applicable either because of ambiguous
reference to overloaded method map (flink 1.0.3)

Thanks already
greetings,
Frank


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.api.scala.ExecutionEnvironment;
import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;

import org.apache.hadoop.mapred.JobConf;
import org.bson.BSONObject;

import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.mapred.MongoInputFormat;

val hdIf = new HadoopInputFormat(new
MongoInputFormat(),classOf[BSONWritable], classOf[BSONWritable], new
JobConf())

hdIf.getJobConf().set("mongo.input.uri",
    "mongodb://localhost:27017/handling.event");

val input = env.createInput(hdIf);

def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
    return t1.toString()
}

// does not work
//input.map mapfunc

// does not work either
input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
// does not work either
//input.map ( (t1, t2) => t1 )

Mime
View raw message