Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8D39D200BE7 for ; Mon, 5 Dec 2016 18:47:50 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8BF62160B09; Mon, 5 Dec 2016 17:47:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D33EA160B18 for ; Mon, 5 Dec 2016 18:47:49 +0100 (CET) Received: (qmail 96567 invoked by uid 500); 5 Dec 2016 17:47:49 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 96557 invoked by uid 99); 5 Dec 2016 17:47:49 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Dec 2016 17:47:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 7B36BC0118 for ; Mon, 5 Dec 2016 17:47:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -7.019 X-Spam-Level: X-Spam-Status: No, score=-7.019 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id D8gZ2q6F8xRR for ; Mon, 5 Dec 2016 17:47:47 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 19C7C5FCD2 for ; Mon, 5 Dec 2016 17:47:46 +0000 (UTC) Received: (qmail 96414 invoked by uid 99); 5 Dec 2016 17:47:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Dec 2016 17:47:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 21F52DFB89; Mon, 5 Dec 2016 17:47:46 +0000 (UTC) From: rmetzger To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the... Content-Type: text/plain Message-Id: <20161205174746.21F52DFB89@git1-us-west.apache.org> Date: Mon, 5 Dec 2016 17:47:46 +0000 (UTC) archived-at: Mon, 05 Dec 2016 17:47:50 -0000 Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90916979 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); + reAddBulkRequest(request); + } else { // Cannot retry action + allRequestsRepeatable = false; + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); --- End diff -- {} instead of string concat. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---