flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetzger <...@git.apache.org>
Subject [GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...
Date Fri, 13 Jan 2017 10:51:36 GMT
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3112#discussion_r95969123
  
    --- Diff: docs/dev/connectors/elasticsearch2.md ---
    @@ -1,173 +0,0 @@
    ----
    -title: "Elasticsearch 2.x Connector"
    -nav-title: Elasticsearch 2.x
    -nav-parent_id: connectors
    -nav-pos: 5
    ----
    -<!--
    -Licensed to the Apache Software Foundation (ASF) under one
    -or more contributor license agreements.  See the NOTICE file
    -distributed with this work for additional information
    -regarding copyright ownership.  The ASF licenses this file
    -to you under the Apache License, Version 2.0 (the
    -"License"); you may not use this file except in compliance
    -with the License.  You may obtain a copy of the License at
    -
    -  http://www.apache.org/licenses/LICENSE-2.0
    -
    -Unless required by applicable law or agreed to in writing,
    -software distributed under the License is distributed on an
    -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    -KIND, either express or implied.  See the License for the
    -specific language governing permissions and limitations
    -under the License.
    --->
    -
    -This connector provides a Sink that can write to an
    -[Elasticsearch 2.x](https://elastic.co/) Index. To use this connector, add the
    -following dependency to your project:
    -
    -{% highlight xml %}
    -<dependency>
    -  <groupId>org.apache.flink</groupId>
    -  <artifactId>flink-connector-elasticsearch2{{ site.scala_version_suffix }}</artifactId>
    -  <version>{{site.version }}</version>
    -</dependency>
    -{% endhighlight %}
    -
    -Note that the streaming connectors are currently not part of the binary
    -distribution. See
    -[here]({{site.baseurl}}/dev/linking)
    -for information about how to package the program with the libraries for
    -cluster execution.
    -
    -#### Installing Elasticsearch 2.x
    -
    -Instructions for setting up an Elasticsearch cluster can be found
    -[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
    -Make sure to set and remember a cluster name. This must be set when
    -creating a Sink for writing to your cluster
    -
    -#### Elasticsearch 2.x Sink
    -The connector provides a Sink that can send data to an Elasticsearch 2.x Index.
    -
    -The sink communicates with Elasticsearch via Transport Client
    -
    -See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html)
    -for information about the Transport Client.
    -
    -The code below shows how to create a sink that uses a `TransportClient` for communication:
    -
    -<div class="codetabs" markdown="1">
    -<div data-lang="java" markdown="1">
    -{% highlight java %}
    -File dataDir = ....;
    -
    -DataStream<String> input = ...;
    -
    -Map<String, String> config = new HashMap<>();
    -// This instructs the sink to emit after every element, otherwise they would be buffered
    -config.put("bulk.flush.max.actions", "1");
    -config.put("cluster.name", "my-cluster-name");
    -
    -List<InetSocketAddress> transports = new ArrayList<>();
    -transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
    -transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
    -
    -input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction<String>()
{
    -  public IndexRequest createIndexRequest(String element) {
    -    Map<String, String> json = new HashMap<>();
    -    json.put("data", element);
    -
    -    return Requests.indexRequest()
    -            .index("my-index")
    -            .type("my-type")
    -            .source(json);
    -  }
    -
    -  @Override
    -  public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
    -    indexer.add(createIndexRequest(element));
    -  }
    -}));
    -{% endhighlight %}
    -</div>
    -<div data-lang="scala" markdown="1">
    -{% highlight scala %}
    -val dataDir = ....;
    -
    -val input: DataStream[String] = ...
    -
    -val config = new util.HashMap[String, String]
    -config.put("bulk.flush.max.actions", "1")
    -config.put("cluster.name", "my-cluster-name")
    -
    -val transports = new ArrayList[String]
    -transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
    -transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
    -
    -input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction[String]
{
    -  def createIndexRequest(element: String): IndexRequest = {
    -    val json = new util.HashMap[String, AnyRef]
    -    json.put("data", element)
    -    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
    -  }
    -
    -  override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer)
{
    -    indexer.add(createIndexRequest(element))
    -  }
    -}))
    -{% endhighlight %}
    -</div>
    -</div>
    -
    -A Map of Strings is used to configure the Sink. The configuration keys
    -are documented in the Elasticsearch documentation
    -[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
    -Especially important is the `cluster.name`. parameter that must correspond to
    -the name of your cluster and with ElasticSearch 2x you also need to specify `path.home`.
    -
    -Internally, the sink uses a `BulkProcessor` to send Action requests to the cluster.
    -This will buffer elements and Action Requests before sending to the cluster. The behaviour
of the
    -`BulkProcessor` can be configured using these config keys:
    - * **bulk.flush.max.actions**: Maximum amount of elements to buffer
    - * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
    - * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other
two
    -  settings in milliseconds
    -
    -This now provides a list of Elasticsearch Nodes
    -to which the sink should connect via a `TransportClient`.
    -
    -More information about Elasticsearch can be found [here](https://elastic.co).
    -
    -
    -#### Packaging the Elasticsearch Connector into an Uber-jar
    -
    -For the execution of your Flink program,
    -it is recommended to build a so-called uber-jar (executable jar) containing all your
dependencies
    -(see [here]({{site.baseurl}}/dev/linking) for further information).
    -
    -However,
    -when an uber-jar containing an Elasticsearch sink is executed,
    -an `IllegalArgumentException` may occur,
    -which is caused by conflicting files of Elasticsearch and it's dependencies
    -in `META-INF/services`:
    -
    -```
    -IllegalArgumentException[An SPI class of type org.apache.lucene.codecs.PostingsFormat
with name 'Lucene50' does not exist.  You need to add the corresponding JAR file supporting
this SPI to your classpath.  The current classpath supports the following names: [es090, completion090,
XBloomFilter]]
    -```
    -
    -If the uber-jar is build by means of maven,
    -this issue can be avoided by adding the following bits to the pom file:
    -
    -```
    -<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    -    <resource>META-INF/services/org.apache.lucene.codecs.Codec</resource>
    -</transformer>
    -<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    -    <resource>META-INF/services/org.apache.lucene.codecs.DocValuesFormat</resource>
    -</transformer>
    -<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    -   <resource>META-INF/services/org.apache.lucene.codecs.PostingsFormat</resource>
    -</transformer>
    --- End diff --
    
    Ah, I see. This has been there before :) Still, would be great if you could fix it :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message