flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] dawidwys commented on a change in pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch
Date Wed, 12 Sep 2018 15:41:23 GMT
dawidwys commented on a change in pull request #6611: [FLINK-3875] [connectors] Add an upsert
table sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611#discussion_r217056983
 
 

 ##########
 File path: docs/dev/table/connect.md
 ##########
 @@ -583,6 +584,104 @@ Make sure to add the version-specific Kafka dependency. In addition,
a correspon
 
 {% top %}
 
+### Elasticsearch Connector
+
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+<span class="label label-info">Format: JSON-only</span>
+
+The Elasticsearch connector allows for writing into an index of the Elasticsearch search
engine.
+
+The connector operates in [upsert mode](#update-modes) and exchanges UPSERT/DELETE messages
with the external system using a [key defined by the query](streaming.html#table-to-stream-conversion).
It can be defined as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+.connect(
+  new Elasticsearch()
+    .version("6")                      // required: valid connector versions are "6"
+    .host("localhost", 9200, "http")   // required: one or more Elasticsearch hosts to connect
to
+    .index("MyUsers")                  // required: Elasticsearch index
+    .documentType("user")              // required: Elasticsearch document type
+
+    .keyDelimiter("$")        // optional: delimiter for composite keys ("_" by default)
+                              //   e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
+    .keyNullLiteral("n/a")    // optional: representation for null fields in keys ("null"
by default)
+
+    // optional: failure handling strategy in case a request to Elasticsearch fails (fail
by default)
+    .failureHandlerFail()          // optional: throws an exception if a request fails and
causes a job failure
+    .failureHandlerIgnore()        //   or ignores failures and drops the request
+    .failureHandlerRetryRejected() //   or re-adds requests that have failed due to queue
capacity saturation
+    .failureHandlerCustom(...)     //   or custom failure handling with a ActionRequestFailureHandler
subclass
+
+    // optional: configure how to buffer elements before sending them in bulk to the cluster
for efficiency
+    .disableFlushOnCheckpoint()    // optional: disables flushing on checkpoint (see notes
below!)
+    .bulkFlushMaxActions(42)       // optional: maximum number of actions to buffer for each
bulk request
+    .bulkFlushMaxSize(42)          // optional: maximum size of buffered actions (in MB)
per bulk request
+    .bulkFlushInterval(60000L)     // optional: bulk flush interval (in milliseconds)
+
+    .bulkFlushBackoffConstant()    // optional: use a constant backoff type
+    .bulkFlushBackoffExponential() //   or use an exponential backoff type
+    .bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
+    .bulkFlushBackoffDelay(30000L) // optional: delay between each backoff attempt (in milliseconds)
+
+    // optional: connection properties to be used during REST communication to Elasticsearch
+    .connectionMaxRetryTimeout(3)  // optional: maximum timeout (in milliseconds) between
retries
+    .connectionPathPrefix("/v1")   // optional: prefix string to be added to every REST communication
+)
+{% endhighlight %}
+</div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+connector:
+  type: kafka
+  version: 6                # required: valid connector versions are "6"
+    hosts:                  # required: one or more Elasticsearch hosts to connect to
+      - hostname: "localhost"
+        port: 9200
+        schema: "http"
+    index: "MyUsers"        # required: Elasticsearch index
+    document-type: "user"   # required: Elasticsearch document type
+
+    key-delimiter: "$"      # optional: delimiter for composite keys ("_" by default)
+                            #   e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
+    key-null-literal: "n/a" # optional: representation for null fields in keys ("null" by
default)
+
+    # optional: failure handling strategy in case a request to Elasticsearch fails ("fail"
by default)
+    failure-handler: ...    # valid strategies are "fail" (throws an exception if a request
fails and
+                            #   thus causes a job failure), "ignore" (ignores failures and
drops the request),
+                            #   "retry-rejected" (re-adds requests that have failed due to
queue capacity
+                            #   saturation), or "custom" for failure handling with a
+                            #   ActionRequestFailureHandler subclass
+
+    # optional: configure how to buffer elements before sending them in bulk to the cluster
for efficiency
+    flush-on-checkpoint: true   # optional: disables flushing on checkpoint (see notes below!)
("true" by default)
+    bulk-flush:
+      max-actions: 42           # optional: maximum number of actions to buffer for each
bulk request
+      max-size: 42              # optional: maximum size of buffered actions (in MB) per
bulk request
+      interval: 60000           # optional: bulk flush interval (in milliseconds)
+      back-off:                 # optional: backoff strategy ("disabled" by default)
+        type: ...               #   valid strategis are "disabled", "constant", or "exponential"
+        max-retries: 3          # optional: maximum number of retries
+        delay: 30000            # optional: delay between each backoff attempt (in milliseconds)
+
+    # optional: connection properties to be used during REST communication to Elasticsearch
+    connection-max-retry-timeout: 3   # optional: maximum timeout (in milliseconds) between
retries
+    connection-path-prefix: "/v1"     # optional: prefix string to be added to every REST
communication
+{% endhighlight %}
+</div>
+</div>
+
+**Bulk flushing:** For more information about characteristics of the optional flushing parameters
see the [corresponding low-level documentation]({{ site.baseurl }}/dev/connectors/elasticsearch.html).
+
+**Disabling flushing on checkpoint:** When disabled, a sink will not wait for all pending
action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT
provide any strong guarantees for at-least-once delivery of action requests.
+
+**Key extraction:** Flink automatically extracts valid keys from a query. For example, a
query `SELECT a, b, c FROM t GROUP BY a, b` defines a composite key of the fields `a` and
`b`. The Elasticsearch connector generates a document ID string for every row by concatenating
all key fields in the order defined in the query using a key delimiter. A custom representation
of null literals for key fields can be defined.
+
+A JSON format defines how to encode documents for the external system, therefore, it must
be added as a dependency. 
 
 Review comment:
   How about link to json format in the dependencies table?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message