flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vijay Srinivasaraghavan <vijikar...@yahoo.com>
Subject ElasticsearchSink Serialization Error
Date Fri, 28 Apr 2017 02:57:44 GMT
I am seeing below error when I try to use ElasticsearchSink. It complains about serialization
and looks like it is leading to "IndexRequestBuilder" implementation. I have tried the suggestion
as mentioned in http://stackoverflow.com/questions/33246864/elasticsearch-sink-seralizability (changed
from anonymous class to concrete class) but it did not help. However, when I call "ElasticsearchSink<>(config,
transports, null)" by passing "null" for "IndexRequestBuilder" then I don't see the serialization
error. This suggests the problem could be with the IndexRequestBuilder implementation but
I am not able to move further.
Could someone please let me know what's the right way to use ElasticsearchSink() API? 
Build DetailsFlink 1.2.0Elastic Search 5.3.0

Error Message

org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction
is not serializable. The object probably contains or references non serializable fields. 
      at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)     
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1539) 
      at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:161) 
      at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1076)
Code Snippet
``` private ElasticsearchSink<Result>  sinkToElasticSearch(AppConfiguration appConfiguration)
throws Exception {
 String host = appConfiguration.getPipeline().getElasticSearch().getHost(); int port = appConfiguration.getPipeline().getElasticSearch().getPort();
String cluster = appConfiguration.getPipeline().getElasticSearch().getCluster();
 Map<String, String> config = new HashMap<>(); config.put("bulk.flush.max.actions",
"1"); config.put("cluster.name", cluster);
 List<TransportAddress> transports = new ArrayList<>(); transports.add(new InetSocketTransportAddress(host,
 return new ElasticsearchSink<>(config, transports, new ResultIndexRequestBuilder(appConfiguration));
 public class ResultIndexRequestBuilder implements IndexRequestBuilder<Result>, Serializable
 private String index; private String type; //private transient Gson gson = new Gson();
 public ResultIndexRequestBuilder() {}
 public ResultIndexRequestBuilder(AppConfiguration appConfiguration) { index = appConfiguration.getPipeline().getElasticSearch().getIndex();
type = appConfiguration.getPipeline().getElasticSearch().getType(); }
 @Override public IndexRequest createIndexRequest(Result result, RuntimeContext ctx) { Gson
gson = new Gson(); String resultAsJson = gson.toJson(result); System.out.println(resultAsJson);
Map<String, String> jsonMap = new HashMap<>(); jsonMap.put("data", resultAsJson);
 return Requests.indexRequest() .index(index) .type(type) .source(jsonMap); }```
View raw message