flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzulitai <...@git.apache.org>
Subject [GitHub] flink pull request #3358: [FLINK-5487] [elasticsearch] At-least-once Elastic...
Date Tue, 21 Feb 2017 11:11:56 GMT
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3358#discussion_r102183824
  
    --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
---
    @@ -67,10 +73,56 @@
     	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
     	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
     	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
    +	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
    +	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
    +	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
    +	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
    +
    +	public enum FlushBackoffType {
    +		CONSTANT,
    +		EXPONENTIAL
    +	}
    +
    +	public class BulkFlushBackoffPolicy implements Serializable {
    +
    +		private static final long serialVersionUID = -6022851996101826049L;
    +
    +		// the default values follow the Elasticsearch default settings for BulkProcessor
    +		private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
    +		private int maxRetryCount = 8;
    +		private long delayMillis = 50;
    +
    +		public FlushBackoffType getBackoffType() {
    +			return backoffType;
    +		}
    +
    +		public int getMaxRetryCount() {
    +			return maxRetryCount;
    +		}
    +
    +		public long getDelayMillis() {
    +			return delayMillis;
    +		}
    +
    +		public void setBackoffType(FlushBackoffType backoffType) {
    +			this.backoffType = checkNotNull(backoffType);
    +		}
    +
    +		public void setMaxRetryCount(int maxRetryCount) {
    +			checkArgument(maxRetryCount > 0);
    +			this.maxRetryCount = maxRetryCount;
    +		}
    +
    +		public void setDelayMillis(long delayMillis) {
    +			checkArgument(delayMillis > 0);
    --- End diff --
    
    True, 0 should be acceptable. Nice catches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message