flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zhun Shen <shenzhunal...@gmail.com>
Subject Using GeoIP2 in Flink Streaming
Date Mon, 04 Apr 2016 14:52:29 GMT
Hi there,

In my case, I want to use GeoIP2 in Flink Streaming, I know I need to serialize geoip2 related
classes using Kryo. But I did know how to do it.

Flink version: 1.0.0
Kafka version: 0.9.0.0
Deploy Mode: Local

My demo code as below:

        File database = new File(“/home/user/GeoIP2-City.mmdb");
        final DatabaseReader reader = new DatabaseReader.Builder(database).build();
        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(),
properties));

        messageStream
                .rebalance()
                .map(new MapFunction<String, String>() {
                    public String map(String value) throws Exception {

                        InetAddress ipAddress = InetAddress.getByName(value);
                        CityResponse response = reader.city(ipAddress);
                        Country country = response.getCountry();
                        return "Kafka and Flink says: " + value + " " + country;
                    }
                }).print();

        env.execute(); 


I got the error below:

Object FlinkTest$1@7c7d3c46 not serializable
	org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
	org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
	org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
	org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:160)
	org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:505)
	FlinkTest.main(FlinkTest.java:36)

Any one can help me ?
Mime
View raw message