flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re:Re: ElasticsearchSink on DataSet
Date Tue, 09 May 2017 06:17:06 GMT
Thanks a lot for the support!

On 9 May 2017 07:53, "Tzu-Li (Gordon) Tai" <tzulitai@apache.org> wrote:

> Hi!
>
> Thanks for sharing that repo! I think that would be quite an useful
> contribution to Flink for the users, if you’re up to preparing a PR for it
> :)
>
> It also looks like you’ve adopted most of the current ElasticsearchSink
> APIs (RequestIndexer, ElasticsearchSinkFunction, etc.) for the
> ElasticsearchOutputFormat, which is nice to fit into the current code :-D
>
> Cheers,
> Gordon
>
>
> On 9 May 2017 at 1:05:14 PM, wyphao.2007 (wyphao.2007@163.com) wrote:
>
> Hi Flavio
>
> Maybe this is what you want: https://github.com/397090770/
> flink-elasticsearch2-connector, It can save Flink DataSet to
> elasticsearch.
>
> import scala.collection.JavaConversions._
>
> val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> "elasticsearch")val
hosts = "www.iteblog.com"val transports = hosts.split(",").map(host => new InetSocketAddress(InetAddress.getByName(host),
9300)).toListval data : DataSet[String] = ....
> data.output(new ElasticSearchOutputFormat(config, transports, new ElasticsearchSinkFunction[String]
{      def createIndexRequest(element: String): IndexRequest = {        Requests.indexRequest.index("iteblog").`type`("info").source(element)
>       }      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer)
{
>         indexer.add(createIndexRequest(element))
>       }
> }))
>
>
> I hope this could help you
>
> 在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"<tzulitai@apache.org>写道:
>
>
> Hi Flavio,
>
> I don’t think there is a bridge class for this. At the moment you’ll have
> to implement your own OutputFormat.
> The ElasticsearchSink is a SinkFunction which is part of the DataStream
> API, which generally speaking at the moment has no bridge or unification
> yet with the DataSet API.
>
> Cheers,
> Gordon
>
>
> On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier (pompermaier@okkam.it)
> wrote:
>
>
> Hi to all,
> at the moment I have a Flink Job that generates a DataSet<String> that I
> write to a File that is read by Logstash to index data on ES.
> I'd like to use the new ElasticsearchSink to index those JSON directly
> from Flink but ElasticsearchSink only works with streaming environment.
>
> Is there any bridge class for this?
>
> Best,
> Flavio
>
>

Mime
View raw message