hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sunc...@apache.org
Subject hive git commit: HIVE-10434 - Cancel connection when remote Spark driver process has failed [Spark Branch] (Chao, reviewed by Marcelo)
Date Fri, 24 Apr 2015 22:25:12 GMT
Repository: hive
Updated Branches:
  refs/heads/spark d64ca1fe6 -> 344ccad09


HIVE-10434 - Cancel connection when remote Spark driver process has failed [Spark Branch]
(Chao, reviewed by Marcelo)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/344ccad0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/344ccad0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/344ccad0

Branch: refs/heads/spark
Commit: 344ccad0980bbe6f8477e031fdf00a66d3281c4a
Parents: d64ca1f
Author: Chao Sun <sunchao@apache.org>
Authored: Fri Apr 24 15:16:36 2015 -0700
Committer: Chao Sun <sunchao@apache.org>
Committed: Fri Apr 24 15:16:36 2015 -0700

----------------------------------------------------------------------
 .../apache/hive/spark/client/SparkClientImpl.java |  3 ++-
 .../apache/hive/spark/client/rpc/RpcServer.java   | 18 ++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/344ccad0/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 71e432d..1bcd221 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -180,7 +180,7 @@ class SparkClientImpl implements SparkClient {
     protocol.cancel(jobId);
   }
 
-  private Thread startDriver(RpcServer rpcServer, final String clientId, final String secret)
+  private Thread startDriver(final RpcServer rpcServer, final String clientId, final String
secret)
       throws IOException {
     Runnable runnable;
     final String serverAddress = rpcServer.getAddress();
@@ -424,6 +424,7 @@ class SparkClientImpl implements SparkClient {
           try {
             int exitCode = child.waitFor();
             if (exitCode != 0) {
+              rpcServer.cancelClient(clientId, "Child process exited before connecting back");
               LOG.warn("Child process exited with code {}.", exitCode);
             }
           } catch (InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/hive/blob/344ccad0/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
index 32d4c46..68ee627 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
@@ -164,6 +164,24 @@ public class RpcServer implements Closeable {
   }
 
   /**
+   * Tells the RPC server to cancel the connection from an existing pending client
+   * @param clientId The identifier for the client
+   * @param msg The error message about why the connection should be canceled
+   */
+  public void cancelClient(final String clientId, final String msg) {
+    final ClientInfo cinfo = pendingClients.remove(clientId);
+    if (cinfo == null) {
+      // Nothing to be done here.
+      return;
+    }
+    cinfo.timeoutFuture.cancel(true);
+    if (!cinfo.promise.isDone()) {
+      cinfo.promise.setFailure(new RuntimeException(
+          String.format("Cancel client '%s'. Error: " + msg, clientId)));
+    }
+  }
+
+  /**
    * Creates a secret for identifying a client connection.
    */
   public String createSecret() {


Mime
View raw message