Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8FC24200C72 for ; Fri, 28 Apr 2017 04:58:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8E48E160BA7; Fri, 28 Apr 2017 02:58:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 883D2160BB2 for ; Fri, 28 Apr 2017 04:58:00 +0200 (CEST) Received: (qmail 77816 invoked by uid 500); 28 Apr 2017 02:57:59 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 77805 invoked by uid 99); 28 Apr 2017 02:57:59 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Apr 2017 02:57:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 1E5DEC0370 for ; Fri, 28 Apr 2017 02:57:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.002 X-Spam-Level: X-Spam-Status: No, score=-0.002 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FORGED_MUA_MOZILLA=1.596, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-2.796, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=yahoo.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id RvswivnsbtNi for ; Fri, 28 Apr 2017 02:57:57 +0000 (UTC) Received: from nm7-vm7.bullet.mail.gq1.yahoo.com (nm7-vm7.bullet.mail.gq1.yahoo.com [98.136.218.214]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 840555FC4A for ; Fri, 28 Apr 2017 02:57:56 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=yahoo.com; s=s2048; t=1493348269; bh=562xPL1TGtvsZPECDjywbTY1ibjTHgV6c9kUbXGREeg=; h=Date:From:Reply-To:To:Subject:References:From:Subject; b=nLth//OLbSTv6hM1msPaEDRKjOsBxiPcXygR0EATSptbgbfWyibvW6DFoRoluJ7afPHHy8+SElQhO6SAY9k43LsdIcY+C+EoeWrlVEfOGPgQYikxDbrHzkDlk/39hVsWENQtYZHIiK6Q45Ald+zrHG3iFz6vpe72fI78JhHmIm+MVClQA2wZZJMseBpL2w04kERjqIPK5Autjk1OGH5z8IFiC7sNepTalu33hTFkzZRdvELJOA2rs0yDZepOM6qbnvDTdDjmxSKJ74nhARrU66u2DsH9NwfI4/Vef9gR+D1oecl0sqb2nfysgOkBYNaRgZIAy8Tw82tpyipO0kDUlg== Received: from [98.137.12.190] by nm7.bullet.mail.gq1.yahoo.com with NNFMP; 28 Apr 2017 02:57:49 -0000 Received: from [98.137.12.218] by tm11.bullet.mail.gq1.yahoo.com with NNFMP; 28 Apr 2017 02:57:49 -0000 Received: from [127.0.0.1] by omp1026.mail.gq1.yahoo.com with NNFMP; 28 Apr 2017 02:57:49 -0000 X-Yahoo-Newman-Property: ymail-3 X-Yahoo-Newman-Id: 521743.17487.bm@omp1026.mail.gq1.yahoo.com X-YMail-OSG: nR7aKQoVM1kQKsSPfV_PbYrnZXPF1AOeMUdGzAkt7680lzasc5XyyaWzc2hRlJG WO5sE2UEQKX0TvxK6h_qaRMQXD1gK3vLa5XD8ozB_mDjSfzDAfuqQv8V_gV.rdpmtssnAjxzzY.c ShkP_JHdsubPvQ7qwya8u4GTh0uzsldV8nxfeyfGqrT5kDUNlXeIscT9n2RVqtSri971qirfYLg0 RJB6m0MCnOx0jua4wqiGQYdDjk_PmD1kn7UVrxq4SN_n279dStjU1z3eP8IMK0keOcPeGUMVIong CPMDfdaamBhhgIi6ClvGOXDt6ciEvABLwaDLuIpY.HyPXa3ITgaw3aUGROcnrtVRAW_xsSVLto0n LOrpeoqFnsL3k36aes0kv1fkj5o3eFthQwR5jESqhZmomiymCFaYlpOOIHgWz4EQu6lRHawgcG4F Ut2iwnB5e9U3gWFNWbakN93.OyQdJJ3QDELNPAHQqoqh4OjFMGt6hoiRZJurbYmYY02tZ2dRm9gL 5ul5jJ_7QNkw61wWjks4LvecjKaUq3nUqtrvKOGV6qoOx2y_t_IosEw-- Received: from jws300068.mail.gq1.yahoo.com by sendmailws134.mail.gq1.yahoo.com; Fri, 28 Apr 2017 02:57:49 +0000; 1493348269.121 Date: Fri, 28 Apr 2017 02:57:44 +0000 (UTC) From: Vijay Srinivasaraghavan Reply-To: Vijay Srinivasaraghavan To: Dev , User Message-ID: <657715667.13275061.1493348264669@mail.yahoo.com> Subject: ElasticsearchSink Serialization Error MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="----=_Part_13275060_343812527.1493348264664" References: <657715667.13275061.1493348264669.ref@mail.yahoo.com> X-Mailer: WebService/1.1.9408 YahooMailNeo Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36 archived-at: Fri, 28 Apr 2017 02:58:01 -0000 ------=_Part_13275060_343812527.1493348264664 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hello, I am seeing below error when I try to use ElasticsearchSink. It complains a= bout serialization and looks like it is leading to "IndexRequestBuilder" im= plementation. I have tried the suggestion as mentioned in=C2=A0http://stack= overflow.com/questions/33246864/elasticsearch-sink-seralizability=C2=A0(cha= nged from anonymous class to concrete class) but it did not help. However, = when I call "ElasticsearchSink<>(config, transports, null)" by passing "nul= l" for "IndexRequestBuilder" then I don't see the serialization error. This= suggests the problem could be with the IndexRequestBuilder implementation = but I am not able to move further. Could someone please let me know what's the right way to use ElasticsearchS= ink() API?=C2=A0 Build DetailsFlink 1.2.0Elastic Search 5.3.0 Error Message org.apache.flink.api.common.InvalidProgramException: The implementation of = the RichSinkFunction is not serializable. The object probably contains or r= eferences non serializable fields.=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache= .flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)=C2=A0 =C2=A0 = =C2=A0 =C2=A0 at org.apache.flink.streaming.api.environment.StreamExecution= Environment.clean(StreamExecutionEnvironment.java:1539)=C2=A0 =C2=A0 =C2=A0= =C2=A0 at org.apache.flink.streaming.api.datastream.DataStream.clean(DataS= tream.java:161)=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.streaming.ap= i.datastream.DataStream.addSink(DataStream.java:1076) Code Snippet ``` private ElasticsearchSink =C2=A0sinkToElasticSearch(AppConfigur= ation appConfiguration) throws Exception { String host =3D appConfiguration.getPipeline().getElasticSearch().getHost(= ); int port =3D appConfiguration.getPipeline().getElasticSearch().getPort()= ; String cluster =3D appConfiguration.getPipeline().getElasticSearch().getC= luster(); Map config =3D new HashMap<>(); config.put("bulk.flush.max= .actions", "1"); config.put("cluster.name", cluster); List transports =3D new ArrayList<>(); transports.add(ne= w InetSocketTransportAddress(host, port)); return new ElasticsearchSink<>(config, transports, new ResultIndexRequestB= uilder(appConfiguration)); } public class ResultIndexRequestBuilder implements IndexRequestBuilder, Serializable { private String index; private String type; //private transient Gson gson = =3D new Gson(); public ResultIndexRequestBuilder() {} public ResultIndexRequestBuilder(AppConfiguration appConfiguration) { inde= x =3D appConfiguration.getPipeline().getElasticSearch().getIndex(); type = =3D appConfiguration.getPipeline().getElasticSearch().getType(); } @Override public IndexRequest createIndexRequest(Result result, RuntimeCon= text ctx) { Gson gson =3D new Gson(); String resultAsJson =3D gson.toJson(r= esult); System.out.println(resultAsJson); Map jsonMap =3D n= ew HashMap<>(); jsonMap.put("data", resultAsJson); return Requests.indexRequest() .index(index) .type(type) .source(jsonMap);= }``` RegardsVijay ------=_Part_13275060_343812527.1493348264664 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hello,

I am seeing below error when I t= ry to use ElasticsearchSink. It complains about serialization and looks lik= e it is leading to "IndexRequestBuilder" implementation. I have tried the s= uggestion as mentioned in http://stackoverflow.com/questions/33246864/elast= icsearch-sink-seralizability (changed from anonymous class to conc= rete class) but it did not help. However, when I call "ElasticsearchSink<= ;>(config, transports, null)" by passing "null" for "IndexRequestBuilder= " then I don't see the serialization error. This suggests the problem could= be with the IndexRequestBuilder implementation but I am not able to move f= urther.
<= br>
Could= someone please let me know what's the right way to use ElasticsearchSink()= API? 

<= b>Build Details
Flink 1.2.0
Elastic Search 5.3.0


Error Message


org.apache.f= link.api.common.InvalidProgramException: The implementation of the RichSink= Function is not serializable. The object probably contains or references no= n serializable fields.
        at org.apache.flink.api.java.ClosureCleaner.c= lean(ClosureCleaner.java:100)
        at org.apache.flink.streaming.api.envi= ronment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:15= 39)
    &nb= sp;   at org.apache.flink.streaming.api.datastream.DataStream.clean(Da= taStream.java:161)
&n= bsp;       at org.apache.flink.streaming.api.datastream.Data= Stream.addSink(DataStream.java:1076)

C= ode Snippet

```
=09priva= te ElasticsearchSink<Result>  sinkToElasticSearch(AppConfigurati= on appConfiguration) throws Exception {

=09=09String h= ost =3D appConfiguration.getPipeline().getElasticSearch().getHost();
<= div id=3D"yui_3_16_0_ym19_1_1493347393526_4705">=09=09int por= t =3D appConfiguration.getPipeline().getElasticSearch().getPort();=09=09String cl= uster =3D appConfiguration.getPipeline().getElasticSearch().getCluster();

=09=09Map<String, String> config =3D new HashMap&= lt;>();
=09= =09config.put("bulk.flush.max.actions", "1");
=09=09config.put("cluster.nam= e", cluster);

=09=09List<TransportAddress> tran= sports =3D new ArrayList<>();
=09=09transports.add(new InetSocketTransportAd= dress(host, port));
<= br id=3D"yui_3_16_0_ym19_1_1493347393526_4724">
=09=09return new ElasticsearchSink= <>(config, transports, new ResultIndexRequestBuilder(appConfiguration= ));
=09= }

=09public class ResultIndexRequestBuilder implements= IndexRequestBuilder<Result>, Serializable {

=09=09private String index;
=09=09private String type;
=09=09//private transient Gson gson = =3D new Gson();

=09=09public ResultIndexRequestBuilder= () {}

=09=09public ResultIndexRequestBuilder(AppConfig= uration appConfiguration) {
=09=09=09index =3D appConfiguration.getPipeline().getE= lasticSearch().getIndex();
=09=09=09type =3D appConfiguration.getPipeline().getEla= sticSearch().getType();
=09=09}

=09=09@Override
=09=09public IndexR= equest createIndexRequest(Result result, RuntimeContext ctx) {
=09=09=09Gson gson = =3D new Gson();
=09=09=09String resultAsJson =3D gson.toJson(result);
=09=09=09System.out= .println(resultAsJson);
=09=09=09Map<String, String> jsonMap =3D new HashMap= <>();
= =09=09=09jsonMap.put("data", resultAsJson);

=09=09=09<= /span>return Requests.indexRequest()
=09=09=09=09=09.index(index)
=09=09=09=09=09.type(ty= pe)
=09=09=09= =09=09.source(jsonMap);
=09=09}
```

Regards
Vijay
------=_Part_13275060_343812527.1493348264664--