flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load
Date Mon, 05 Dec 2016 17:47:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15722847#comment-15722847
] 

ASF GitHub Bot commented on FLINK-5122:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2861#discussion_r90917039
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
---
    @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) {
     
     			@Override
     			public void afterBulk(long executionId, BulkRequest request, BulkResponse response)
{
    +				boolean allRequestsRepeatable = true;
     				if (response.hasFailures()) {
     					for (BulkItemResponse itemResp : response.getItems()) {
     						if (itemResp.isFailed()) {
    -							LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
    -							failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
    +							// Check if index request can be retried
    +							String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
    +							if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed
out") // Generic timeout errors
    +									|| failureMessageLowercase.contains("UnavailableShardsException".toLowerCase())
// Shard not available due to rebalancing or node down
    +									|| (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk"))
// Bulk index queue on node full 
    +								) {
    +								LOG.debug("Retry batch: " + itemResp.getFailureMessage());
    +								reAddBulkRequest(request);
    +							} else { // Cannot retry action
    +								allRequestsRepeatable = false;
    +								LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
    +								failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));

    +							}
     						}
     					}
    -					hasFailure.set(true);
    +					if (!allRequestsRepeatable) {
    +						hasFailure.set(true);
    +					}
     				}
     			}
     
     			@Override
     			public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
    -				LOG.error(failure.getMessage());
    -				failureThrowable.compareAndSet(null, failure);
    -				hasFailure.set(true);
    +				if (failure instanceof ClusterBlockException // Examples: "no master"
    +						|| failure instanceof ElasticsearchTimeoutException // ElasticsearchTimeoutException
sounded good, not seen in stress tests yet
    +						) 
    +				{
    +					LOG.debug("Retry batch on throwable: " + failure.getMessage());
    --- End diff --
    
    String concat


> Elasticsearch Sink loses documents when cluster has high load
> -------------------------------------------------------------
>
>                 Key: FLINK-5122
>                 URL: https://issues.apache.org/jira/browse/FLINK-5122
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.2.0
>            Reporter: static-max
>            Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at least once"
semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be created and balanced.
On those errors the bulk should be tried again instead of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink
 - Failed to index document in Elasticsearch: UnavailableShardsException[[index-name][3] primary
shard is not active Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20]
requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink
 - Failed to index document in Elasticsearch: RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$4@727e677c
on EsThreadPoolExecutor[bulk, queue capacity = 1, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message