flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mingleizhang <18717838...@163.com>
Subject Re:Re:How to verify the data to Elasticsearch whether correct or not ?
Date Wed, 16 Aug 2017 12:52:34 GMT


Hi, Gordon.


      I am not sure about this, as far as I know. ElasticSearch often store JSON data inside
it as it is convenient to create it's index. As refers to my code below, I stored the protobuf
objects (ActivityInfo which build from activityinfo.proto file) in ElasticSearch. And it is
a binary data stored in it. It is very strange I feel. Flink document just give an example
for it's data which type belongs to a string as JSON.


Peace,
Zhangminglei





At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai" <tzulitai@apache.org> wrote:

Hi,


I couldn’t spot anything off in the code snippet you provided. So you should be ok with
this :)


Cheers,
Gordon





On 15 August 2017 at 9:18:59 PM, mingleizhang (18717838093@163.com) wrote:

BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's value setted
to itself.





At 2017-08-15 21:17:00, "mingleizhang" <18717838093@163.com> wrote:

Hi, flink experts!


I sinked my data ( PB objects ) to elasticsearch. I dont know whether the sinked data is correct
or incorrect. The codes like following, Could you help me check it please ? Im not familar
with ES. Now, I want to install a kibana to view my data. But I dont know the below codes
is correct or incorrect. I ran the flink program. it does not give me an error. I just want
to confirm.


// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress,
new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
    json.put("data", element)
    Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
  }
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer:
RequestIndexer): Unit = {
    requestIndexer.add(createIndexRequest(activityInfo))
  }
}))


Thanks
mingleizhang





 




【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
       
Mime
View raw message