From issues-return-188157-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Sep 12 17:41:25 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id C362D18077D for ; Wed, 12 Sep 2018 17:41:24 +0200 (CEST) Received: (qmail 90254 invoked by uid 500); 12 Sep 2018 15:41:23 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 90193 invoked by uid 99); 12 Sep 2018 15:41:23 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Sep 2018 15:41:23 +0000 From: GitBox To: issues@flink.apache.org Subject: [GitHub] dawidwys commented on a change in pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch Message-ID: <153676688317.8760.3080170961408206680.gitbox@gitbox.apache.org> Date: Wed, 12 Sep 2018 15:41:23 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit dawidwys commented on a change in pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch URL: https://github.com/apache/flink/pull/6611#discussion_r217083933 ########## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java ########## @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.Host; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_DELAY; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_MAX_RETRIES; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_TYPE; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_INTERVAL; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_MAX_ACTIONS; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_BULK_FLUSH_MAX_SIZE; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_CONNECTION_PATH_PREFIX; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_DOCUMENT_TYPE; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_CLASS; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_FLUSH_ON_CHECKPOINT; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_HOSTS; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_HOSTS_HOSTNAME; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_HOSTS_PORT; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_HOSTS_SCHEMA; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_INDEX; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_DELIMITER; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_NULL_LITERAL; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_TYPE_VALUE_ELASTICSEARCH; + +/** + * Connector descriptor for the Elasticsearch search engine. + */ +public class Elasticsearch extends ConnectorDescriptor { + + private String version; + private List hosts = new ArrayList<>(); + private String index; + private String documentType; + private String keyDelimiter; + private String keyNullLiteral; + private String failureHandlerType; + private Class failureHandlerClass; + private Boolean flushOnCheckpoint; + private Integer bulkFlushMaxActions; + private Integer bulkFlushMaxSize; + private Long bulkFlushInterval; + private String bulkFlushBackoffType; + private Integer bulkFlushBackoffMaxRetries; + private Long bulkFlushBackoffDelay; + private Integer connectionMaxRetryTimeout; + private String connectionPathPrefix; + + /** + * Connector descriptor for the Elasticsearch search engine. + */ + public Elasticsearch() { + super(CONNECTOR_TYPE_VALUE_ELASTICSEARCH, 1, true); + } + + /** + * Sets the Elasticsearch version to be used. Required. + * + * @param version Elasticsearch version. E.g., "6". + */ + public Elasticsearch version(String version) { + this.version = Preconditions.checkNotNull(version); + return this; + } + + /** + * Adds an Elasticsearch host to connect to. Required. + * + *

Multiple hosts can be declared by calling this method multiple times. + * + * @param hostname connection hostname + * @param port connection port + * @param schema connection schema; e.g. "http" + */ + public Elasticsearch host(String hostname, int port, String schema) { + final Host host = + new Host( + Preconditions.checkNotNull(hostname), + port, + Preconditions.checkNotNull(schema)); + hosts.add(host); + return this; + } + + /** + * Declares the Elasticsearch index for every record. Required. + * + * @param index Elasticsearch index + */ + public Elasticsearch index(String index) { + this.index = Preconditions.checkNotNull(index); + return this; + } + + /** + * Declares the Elasticsearch document type for every record. Required. + * + * @param documentType Elasticsearch document type + */ + public Elasticsearch documentType(String documentType) { + this.documentType = Preconditions.checkNotNull(documentType); + return this; + } + + /** + * Sets a custom key delimiter in case the Elasticsearch ID needs to be constructed from + * multiple fields. Optional. + * + * @param keyDelimiter key delimiter; e.g., "$" would result in IDs "KEY1$KEY2$KEY3" + */ + public Elasticsearch keyDelimiter(String keyDelimiter) { + this.keyDelimiter = Preconditions.checkNotNull(keyDelimiter); + return this; + } + + /** + * Sets a custom representation for null fields in keys. Optional. + * + * @param keyNullLiteral key null literal string; e.g. "N/A" would result in IDs "KEY1_N/A_KEY3" + */ + public Elasticsearch keyNullLiteral(String keyNullLiteral) { + this.keyNullLiteral = Preconditions.checkNotNull(keyNullLiteral); + return this; + } + + /** + * Configures a failure handling strategy in case a request to Elasticsearch fails. + * + *

This strategy throws an exception if a request fails and thus causes a job failure. + */ + public Elasticsearch failureHandlerFail() { + this.failureHandlerType = ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_VALUE_FAIL; + this.failureHandlerClass = null; + return this; + } + + /** + * Configures a failure handling strategy in case a request to Elasticsearch fails. + * + *

This strategy ignores failures and drops the request. + */ + public Elasticsearch failureHandlerIgnore() { + this.failureHandlerType = ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_VALUE_IGNORE; + this.failureHandlerClass = null; + return this; + } + + /** + * Configures a failure handling strategy in case a request to Elasticsearch fails. + * + *

This strategy re-adds requests that have failed due to queue capacity saturation. + */ + public Elasticsearch failureHandlerRetryRejected() { + this.failureHandlerType = ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_VALUE_RETRY; + this.failureHandlerClass = null; + return this; + } + + /** + * Configures a failure handling strategy in case a request to Elasticsearch fails. + * + *

This strategy allows for custom failure handling using a {@link ActionRequestFailureHandler}. + */ + public Elasticsearch failureHandlerCustom(Class failureHandlerClass) { + this.failureHandlerType = ElasticsearchValidator.CONNECTOR_FAILURE_HANDLER_VALUE_CUSTOM; + this.failureHandlerClass = Preconditions.checkNotNull(failureHandlerClass); + return this; + } + + /** + * Disables flushing on checkpoint. When disabled, a sink will not wait for all pending action + * requests to be acknowledged by Elasticsearch on checkpoints. + * + *

Note: If flushing on checkpoint is disabled, a Elasticsearch sink does NOT + * provide any strong guarantees for at-least-once delivery of action requests. + */ + public Elasticsearch disableFlushOnCheckpoint() { + this.flushOnCheckpoint = false; + return this; + } + + /** + * Configures how to buffer elements before sending them in bulk to the cluster for efficiency. + * + *

Sets the maximum number of actions to buffer for each bulk request. + * + * @param maxActions the maximum number of actions to buffer per bulk request. + */ + public Elasticsearch bulkFlushMaxActions(int maxActions) { + this.bulkFlushMaxActions = maxActions; + return this; + } + + /** + * Configures how to buffer elements before sending them in bulk to the cluster for efficiency. + * + *

Sets the maximum size of buffered actions (in MB) per bulk request. + */ + public Elasticsearch bulkFlushMaxSize(int maxSizeMb) { + this.bulkFlushMaxSize = maxSizeMb; + return this; + } + + /** + * Configures how to buffer elements before sending them in bulk to the cluster for efficiency. + * + *

Sets the bulk flush interval (in milliseconds). + * + * @param interval bulk flush interval (in milliseconds). + */ + public Elasticsearch bulkFlushInterval(long interval) { + this.bulkFlushInterval = interval; + return this; + } + + /** + * Configures how to buffer elements before sending them in bulk to the cluster for efficiency. + * + *

Sets a constant backoff type to use when flushing bulk requests. + */ + public Elasticsearch bulkFlushBackoffConstant() { + this.bulkFlushBackoffType = ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_CONSTANT; + return this; + } + + /** + * Configures how to buffer elements before sending them in bulk to the cluster for efficiency. + * + *

Sets an exponential backoff type to use when flushing bulk requests. + */ + public Elasticsearch bulkFlushBackoffExponential() { + this.bulkFlushBackoffType = ElasticsearchValidator.CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_EXPONENTIAL; + return this; + } + + /** + * Configures how to buffer elements before sending them in bulk to the cluster for efficiency. + * + *

Sets the maximum number of retries for a backoff attempt when flushing bulk requests. + * + *

Make sure to enable backoff by selecting a strategy ({@link #bulkFlushBackoffConstant()} or + * {@link #bulkFlushBackoffExponential()}). + * + * @param maxRetries the maximum number of retries. + */ + public Elasticsearch bulkFlushBackoffMaxRetries(int maxRetries) { + this.bulkFlushBackoffMaxRetries = maxRetries; + return this; + } + + /** + * Configures how to buffer elements before sending them in bulk to the cluster for efficiency. + * + *

Sets the amount of delay between each backoff attempt when flushing bulk requests (in milliseconds). + * + *

Make sure to enable backoff by selecting a strategy ({@link #bulkFlushBackoffConstant()} or + * {@link #bulkFlushBackoffExponential()}). + * + * @param delay delay between each backoff attempt (in milliseconds). + */ + public Elasticsearch bulkFlushBackoffDelay(long delay) { + this.bulkFlushBackoffDelay = delay; + return this; + } + + /** + * Sets connection properties to be used during REST communication to Elasticsearch. + * + *

Sets the maximum timeout (in milliseconds) in case of multiple retries of the same request. + * + * @param maxRetryTimeout maximum timeout (in milliseconds) + */ + public Elasticsearch connectionMaxRetryTimeout(int maxRetryTimeout) { + this.connectionMaxRetryTimeout = maxRetryTimeout; + return this; + } + + /** + * Sets connection properties to be used during REST communication to Elasticsearch. + * + *

Adds a path prefix to every REST communication. + * + * @param pathPrefix prefix string to be added to every REST communication + */ + public Elasticsearch connectionPathPrefix(String pathPrefix) { + this.connectionPathPrefix = pathPrefix; + return this; + } + + @Override + public void addConnectorProperties(DescriptorProperties properties) { + if (version != null) { + properties.putString(CONNECTOR_VERSION(), version); + } + + final List> hostValues = hosts.stream() + .map(host -> Arrays.asList(host.hostname, String.valueOf(host.port), host.schema)) + .collect(Collectors.toList()); + properties.putIndexedFixedProperties( + CONNECTOR_HOSTS, + Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_SCHEMA), + hostValues); + + if (index != null) { Review comment: Could we keep all of those properties from the start in `DescriptorProperties` and in here just merge them into the `properties` parameter? This way we would save some boiler plate code for checking against null. What do you think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services