Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 44627187DC for ; Fri, 24 Apr 2015 22:25:18 +0000 (UTC) Received: (qmail 28339 invoked by uid 500); 24 Apr 2015 22:25:12 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 28287 invoked by uid 500); 24 Apr 2015 22:25:12 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 28276 invoked by uid 99); 24 Apr 2015 22:25:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Apr 2015 22:25:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6BE8DE0911; Fri, 24 Apr 2015 22:25:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sunchao@apache.org To: commits@hive.apache.org Message-Id: <25bb153b88444993b0f811ac4f5e798e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-10434 - Cancel connection when remote Spark driver process has failed [Spark Branch] (Chao, reviewed by Marcelo) Date: Fri, 24 Apr 2015 22:25:12 +0000 (UTC) Repository: hive Updated Branches: refs/heads/spark d64ca1fe6 -> 344ccad09 HIVE-10434 - Cancel connection when remote Spark driver process has failed [Spark Branch] (Chao, reviewed by Marcelo) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/344ccad0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/344ccad0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/344ccad0 Branch: refs/heads/spark Commit: 344ccad0980bbe6f8477e031fdf00a66d3281c4a Parents: d64ca1f Author: Chao Sun Authored: Fri Apr 24 15:16:36 2015 -0700 Committer: Chao Sun Committed: Fri Apr 24 15:16:36 2015 -0700 ---------------------------------------------------------------------- .../apache/hive/spark/client/SparkClientImpl.java | 3 ++- .../apache/hive/spark/client/rpc/RpcServer.java | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/344ccad0/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 71e432d..1bcd221 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -180,7 +180,7 @@ class SparkClientImpl implements SparkClient { protocol.cancel(jobId); } - private Thread startDriver(RpcServer rpcServer, final String clientId, final String secret) + private Thread startDriver(final RpcServer rpcServer, final String clientId, final String secret) throws IOException { Runnable runnable; final String serverAddress = rpcServer.getAddress(); @@ -424,6 +424,7 @@ class SparkClientImpl implements SparkClient { try { int exitCode = child.waitFor(); if (exitCode != 0) { + rpcServer.cancelClient(clientId, "Child process exited before connecting back"); LOG.warn("Child process exited with code {}.", exitCode); } } catch (InterruptedException ie) { http://git-wip-us.apache.org/repos/asf/hive/blob/344ccad0/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index 32d4c46..68ee627 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -164,6 +164,24 @@ public class RpcServer implements Closeable { } /** + * Tells the RPC server to cancel the connection from an existing pending client + * @param clientId The identifier for the client + * @param msg The error message about why the connection should be canceled + */ + public void cancelClient(final String clientId, final String msg) { + final ClientInfo cinfo = pendingClients.remove(clientId); + if (cinfo == null) { + // Nothing to be done here. + return; + } + cinfo.timeoutFuture.cancel(true); + if (!cinfo.promise.isDone()) { + cinfo.promise.setFailure(new RuntimeException( + String.format("Cancel client '%s'. Error: " + msg, clientId))); + } + } + + /** * Creates a secret for identifying a client connection. */ public String createSecret() {