flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Re: ElasticsearchSink on DataSet
Date Tue, 09 May 2017 07:30:45 GMT
Just one note: I took a look at your connector and it doesn't provide any
failure handling mechanism that is very useful for us.
Maybe it could worth to add ActionRequestFailureHandler as provided now by
the current ES streaming connector and introduced by commit
https://github.com/apache/flink/commit/3743e898104d79a9813d444d38fa9f86617bb5ef

Best,
Flavio

On Tue, May 9, 2017 at 8:17 AM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> Thanks a lot for the support!
>
> On 9 May 2017 07:53, "Tzu-Li (Gordon) Tai" <tzulitai@apache.org> wrote:
>
>> Hi!
>>
>> Thanks for sharing that repo! I think that would be quite an useful
>> contribution to Flink for the users, if you’re up to preparing a PR for it
>> :)
>>
>> It also looks like you’ve adopted most of the current ElasticsearchSink
>> APIs (RequestIndexer, ElasticsearchSinkFunction, etc.) for the
>> ElasticsearchOutputFormat, which is nice to fit into the current code :-D
>>
>> Cheers,
>> Gordon
>>
>>
>> On 9 May 2017 at 1:05:14 PM, wyphao.2007 (wyphao.2007@163.com) wrote:
>>
>> Hi Flavio
>>
>> Maybe this is what you want: https://github.com/397090770/f
>> link-elasticsearch2-connector, It can save Flink DataSet to
>> elasticsearch.
>>
>> import scala.collection.JavaConversions._
>>
>> val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> "elasticsearch")val
hosts = "www.iteblog.com"val transports = hosts.split(",").map(host => new InetSocketAddress(InetAddress.getByName(host),
9300)).toListval data : DataSet[String] = ....
>> data.output(new ElasticSearchOutputFormat(config, transports, new ElasticsearchSinkFunction[String]
{      def createIndexRequest(element: String): IndexRequest = {        Requests.indexRequest.index("iteblog").`type`("info").source(element)
>>       }      override def process(element: String, ctx: RuntimeContext, indexer:
RequestIndexer) {
>>         indexer.add(createIndexRequest(element))
>>       }
>> }))
>>
>>
>> I hope this could help you
>>
>> 在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"<tzulitai@apache.org>写道:
>>
>>
>> Hi Flavio,
>>
>> I don’t think there is a bridge class for this. At the moment you’ll have
>> to implement your own OutputFormat.
>> The ElasticsearchSink is a SinkFunction which is part of the DataStream
>> API, which generally speaking at the moment has no bridge or unification
>> yet with the DataSet API.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier (pompermaier@okkam.it)
>> wrote:
>>
>>
>> Hi to all,
>> at the moment I have a Flink Job that generates a DataSet<String> that I
>> write to a File that is read by Logstash to index data on ES.
>> I'd like to use the new ElasticsearchSink to index those JSON directly
>> from Flink but ElasticsearchSink only works with streaming environment.
>>
>> Is there any bridge class for this?
>>
>> Best,
>> Flavio
>>
>>


-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 1823908

Mime
View raw message