flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load
Date Wed, 01 Feb 2017 07:08:51 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15848072#comment-15848072
] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5122 at 2/1/17 7:07 AM:
--------------------------------------------------------------------

I would like to handle this issue together with FLINK-5353 with a different approach: let
the user provide a {{FailedActionRequestHandler}} that implements how to deal with an action
request that failed, ex. drop it or re-add it to the {{BulkProcessor}}.

The reason for this is that there is actually quite a variety of different reasons an action
request can fail, and for different cases, can be treated to be "temporary" differently. For
example, in FLINK-5353, malformed documents can somewhat be "temporary" if the erroneous field
is reprocessed. Instead of handling these case by case, I propose to let user implement logic
for them.

The handler will look something like this:

{code}
public interface FailedActionRequestHandler {
    boolean onFailure(ActionRequest originalRequest, Throwable failure, RequestIndexer indexer);
}
{code}

The ElasticsearchSink will still try to retry a bulk request (with backoff) for obvious temporary
errors like {{EsRejectedExecutionException}}, and will only call {{onFailure}} after the retries.
There the user can decide whether they want to re-add it to be requested through the {{RequestIndexer}}
or just drop it. The method should return {{true}} / {{false}} depending on whether they'd
like to fail the sink because of that failure.

What do you think? Sorry for being picky about how to resolve this. I think it'll be best
to find a good long-term solution and it'll be helpful to know what actual ES Flink users
think of the idea.


was (Author: tzulitai):
I would like to handle this issue together with FLINK-5353 with a different approach: let
the user provide a {{FailedActionRequestHandler}} that implements how to deal with an action
request that failed, ex. drop it or re-add it to the {{BulkProcessor}}.

The reason for this is that there is actually quite a variety of different reasons an action
request can fail, and for different cases, can be treated to be "temporary" differently. For
example, in FLINK-5353, malformed documents can somewhat be "temporary" if the erroneous field
is reprocessed. Instead of handling these case by case, I propose to let user implement logic
for them.

The handler will look something like this:

{code}
public interface FailedActionRequestHandler {
    boolean onFailure(ActionRequest originalRequest, Throwable failure, RequestIndexer indexer);
}
{code}

The ElasticsearchSink will still try to retry a bulk request (with backoff) for obvious temporary
errors like {{EsRejectedExecutionException}}, and will only call {{onFailure}} after the retries.
There the user can decide whether they want to re-add it to be requested through the {{RequestIndexer}}
or just drop it. The method should return {{true}} / {{false}} depending on whether they'd
like to fail the sink because of that failure.

What do you think?

> Elasticsearch Sink loses documents when cluster has high load
> -------------------------------------------------------------
>
>                 Key: FLINK-5122
>                 URL: https://issues.apache.org/jira/browse/FLINK-5122
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.2.0
>            Reporter: static-max
>            Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at least once"
semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be created and balanced.
On those errors the bulk should be tried again instead of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink
 - Failed to index document in Elasticsearch: UnavailableShardsException[[index-name][3] primary
shard is not active Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20]
requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink
 - Failed to index document in Elasticsearch: RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$4@727e677c
on EsThreadPoolExecutor[bulk, queue capacity = 1, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message