hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [37/50] [abbrv] hive git commit: HIVE-10721 SparkSessionManagerImpl leaks SparkSessions [Spark Branch] (Jimmy reviewed by Xuefu, Chengxiang)
Date Tue, 02 Jun 2015 23:23:42 GMT
HIVE-10721 SparkSessionManagerImpl leaks SparkSessions [Spark Branch] (Jimmy reviewed by Xuefu,
Chengxiang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b3214f99
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b3214f99
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b3214f99

Branch: refs/heads/llap
Commit: b3214f996c9889035110ba548ea8eddf8390cdce
Parents: 3f27c6e
Author: Jimmy Xiang <jxiang@cloudera.com>
Authored: Fri May 15 08:34:38 2015 -0700
Committer: Xuefu Zhang <xzhang@Cloudera.com>
Committed: Mon Jun 1 14:04:09 2015 -0700

----------------------------------------------------------------------
 .../ql/exec/spark/LocalHiveSparkClient.java     |  8 ++-
 .../ql/exec/spark/RemoteHiveSparkClient.java    |  6 +-
 .../ql/exec/spark/session/SparkSessionImpl.java |  2 +-
 .../spark/session/SparkSessionManagerImpl.java  | 63 ++++++++++----------
 .../hive/spark/client/SparkClientImpl.java      |  2 +-
 5 files changed, 43 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b3214f99/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
index 7e33a3f..19d3fee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
@@ -199,7 +199,11 @@ public class LocalHiveSparkClient implements HiveSparkClient {
 
   @Override
   public void close() {
-    sc.stop();
-    client = null;
+    synchronized (LocalHiveSparkClient.class) {
+      client = null;
+    }
+    if (sc != null) {
+      sc.stop();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b3214f99/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index bae30f3..8b15099 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -199,15 +199,19 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
 
   @Override
   public void close() {
-    remoteClient.stop();
+    if (remoteClient != null) {
+      remoteClient.stop();
+    }
   }
 
   private static class JobStatusJob implements Job<Serializable> {
 
+    private static final long serialVersionUID = 1L;
     private final byte[] jobConfBytes;
     private final byte[] scratchDirBytes;
     private final byte[] sparkWorkBytes;
 
+    @SuppressWarnings("unused")
     private JobStatusJob() {
       // For deserialization.
       this(null, null, null);

http://git-wip-us.apache.org/repos/asf/hive/blob/b3214f99/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index 603f1ca..49e5f6c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -53,7 +53,7 @@ public class SparkSessionImpl implements SparkSession {
     isOpen = true;
     try {
       hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf);
-    } catch (Exception e) {
+    } catch (Throwable e) {
       throw new HiveException("Failed to create spark client.", e);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b3214f99/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
index ad012b6..616807c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
@@ -17,23 +17,19 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.session;
 
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.spark.client.SparkClientFactory;
-
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hive.spark.client.SparkClientFactory;
 
 /**
  * Simple implementation of <i>SparkSessionManager</i>
@@ -44,8 +40,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class SparkSessionManagerImpl implements SparkSessionManager {
   private static final Log LOG = LogFactory.getLog(SparkSessionManagerImpl.class);
 
-  private Set<SparkSession> createdSessions;
-  private AtomicBoolean inited = new AtomicBoolean(false);
+  private Set<SparkSession> createdSessions = Collections.synchronizedSet(new HashSet<SparkSession>());
+  private volatile boolean inited = false;
 
   private static SparkSessionManagerImpl instance;
 
@@ -78,14 +74,18 @@ public class SparkSessionManagerImpl implements SparkSessionManager {
 
   @Override
   public void setup(HiveConf hiveConf) throws HiveException {
-    if (inited.compareAndSet(false, true)) {
-      LOG.info("Setting up the session manager.");
-      createdSessions = Collections.synchronizedSet(new HashSet<SparkSession>());
-      Map<String, String> conf = HiveSparkClientFactory.initiateSparkConf(hiveConf);
-      try {
-        SparkClientFactory.initialize(conf);
-      } catch (IOException e) {
-        throw new HiveException("Error initializing SparkClientFactory", e);
+    if (!inited) {
+      synchronized (this) {
+        if (!inited) {
+          LOG.info("Setting up the session manager.");
+          Map<String, String> conf = HiveSparkClientFactory.initiateSparkConf(hiveConf);
+          try {
+            SparkClientFactory.initialize(conf);
+            inited = true;
+          } catch (IOException e) {
+            throw new HiveException("Error initializing SparkClientFactory", e);
+          }
+        }
       }
     }
   }
@@ -104,14 +104,12 @@ public class SparkSessionManagerImpl implements SparkSessionManager
{
     if (existingSession != null) {
       // Open the session if it is closed.
       if (!existingSession.isOpen() && doOpen) {
-	existingSession.open(conf);
+        existingSession.open(conf);
       }
       return existingSession;
     }
 
     SparkSession sparkSession = new SparkSessionImpl();
-    createdSessions.add(sparkSession);
-
     if (doOpen) {
       sparkSession.open(conf);
     }
@@ -119,6 +117,7 @@ public class SparkSessionManagerImpl implements SparkSessionManager {
     if (LOG.isDebugEnabled()) {
       LOG.debug(String.format("New session (%s) is created.", sparkSession.getSessionId()));
     }
+    createdSessions.add(sparkSession);
     return sparkSession;
   }
 
@@ -144,17 +143,15 @@ public class SparkSessionManagerImpl implements SparkSessionManager
{
   @Override
   public void shutdown() {
     LOG.info("Closing the session manager.");
-    if (createdSessions != null) {
-      synchronized (createdSessions) {
-        Iterator<SparkSession> it = createdSessions.iterator();
-        while (it.hasNext()) {
-          SparkSession session = it.next();
-          session.close();
-        }
-        createdSessions.clear();
+    synchronized (createdSessions) {
+      Iterator<SparkSession> it = createdSessions.iterator();
+      while (it.hasNext()) {
+        SparkSession session = it.next();
+        session.close();
       }
+      createdSessions.clear();
     }
-    inited.set(false);
+    inited = false;
     SparkClientFactory.stop();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b3214f99/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 1bcd221..9e34a49 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -97,7 +97,7 @@ class SparkClientImpl implements SparkClient {
     try {
       // The RPC server will take care of timeouts here.
       this.driverRpc = rpcServer.registerClient(clientId, secret, protocol).get();
-    } catch (Exception e) {
+    } catch (Throwable e) {
       LOG.warn("Error while waiting for client to connect.", e);
       driverThread.interrupt();
       try {


Mime
View raw message