flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Vyas <jayunit100.apa...@gmail.com>
Subject Re: Using Flink with Redis question
Date Fri, 04 Sep 2015 18:11:24 GMT
Maybe wrapping Jedis with a serializable class will do the trick?

But in general is there a way to reference jar classes  in flink apps without serializable
them?


> On Sep 4, 2015, at 1:36 PM, Jerry Peng <jerry.boyang.peng@gmail.com> wrote:
> 
> Hello,
> 
> So I am trying to use jedis (redis java client) with Flink streaming api, but I get an
exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an
error.
> 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
> 	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
> 	at org.apache.flink.client.program.Client.run(Client.java:278)
> 	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631)
> 	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)
> 	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954)
> 	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Object flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a
not serializable
> 	at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
> 	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
> 	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320)
> 	at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144)
> 	at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624)
> 	at flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:483)
> 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
> 	... 6 more
> Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> 	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306)
> 	at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
> 	... 16 more
> 
> 
> 
> 
> so my code I am using: 
> 
> public static class RedisJoinBolt implements FlatMapFunction<Tuple5<String, String,String,String,String>
>     , Tuple6<String, String,String,String,String,String>> {
>  private Jedis jedis;
>  private HashMap<String, String> ad_to_campaign;
> 
>  public RedisJoinBolt(String jedisServer) {
>   //initialize jedis
>   this.jedis = new Jedis(jedisServer);
>  }
> 
>  @Override
>  public void flatMap(Tuple5<String,String,String,String,String> input,
>            Collector<Tuple6<String,String,String,String,String,String>> out)
throws Exception {
> .
> .
> .
> 
> Any one know a fix for this?

Mime
View raw message