Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B157E1750B for ; Mon, 18 May 2015 17:37:44 +0000 (UTC) Received: (qmail 31147 invoked by uid 500); 18 May 2015 17:37:44 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 31101 invoked by uid 500); 18 May 2015 17:37:44 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 31090 invoked by uid 99); 18 May 2015 17:37:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 May 2015 17:37:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5FC6EE0913; Mon, 18 May 2015 17:37:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jxiang@apache.org To: commits@hive.apache.org Message-Id: <8d0f11f401044531a508068410b5d65a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-10721 SparkSessionManagerImpl leaks SparkSessions [Spark Branch] (Jimmy reviewed by Xuefu, Chengxiang) Date: Mon, 18 May 2015 17:37:44 +0000 (UTC) Repository: hive Updated Branches: refs/heads/spark 889c41f12 -> 74ac99fa3 HIVE-10721 SparkSessionManagerImpl leaks SparkSessions [Spark Branch] (Jimmy reviewed by Xuefu, Chengxiang) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/74ac99fa Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/74ac99fa Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/74ac99fa Branch: refs/heads/spark Commit: 74ac99fa34cc3c29ec76af72596a1a92e75b3d79 Parents: 889c41f Author: Jimmy Xiang Authored: Fri May 15 08:34:38 2015 -0700 Committer: Jimmy Xiang Committed: Mon May 18 10:36:14 2015 -0700 ---------------------------------------------------------------------- .../ql/exec/spark/LocalHiveSparkClient.java | 8 ++- .../ql/exec/spark/RemoteHiveSparkClient.java | 6 +- .../ql/exec/spark/session/SparkSessionImpl.java | 2 +- .../spark/session/SparkSessionManagerImpl.java | 63 ++++++++++---------- .../hive/spark/client/SparkClientImpl.java | 2 +- 5 files changed, 43 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/74ac99fa/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index 7e33a3f..19d3fee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -199,7 +199,11 @@ public class LocalHiveSparkClient implements HiveSparkClient { @Override public void close() { - sc.stop(); - client = null; + synchronized (LocalHiveSparkClient.class) { + client = null; + } + if (sc != null) { + sc.stop(); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/74ac99fa/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index bae30f3..8b15099 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -199,15 +199,19 @@ public class RemoteHiveSparkClient implements HiveSparkClient { @Override public void close() { - remoteClient.stop(); + if (remoteClient != null) { + remoteClient.stop(); + } } private static class JobStatusJob implements Job { + private static final long serialVersionUID = 1L; private final byte[] jobConfBytes; private final byte[] scratchDirBytes; private final byte[] sparkWorkBytes; + @SuppressWarnings("unused") private JobStatusJob() { // For deserialization. this(null, null, null); http://git-wip-us.apache.org/repos/asf/hive/blob/74ac99fa/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 603f1ca..49e5f6c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -53,7 +53,7 @@ public class SparkSessionImpl implements SparkSession { isOpen = true; try { hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf); - } catch (Exception e) { + } catch (Throwable e) { throw new HiveException("Failed to create spark client.", e); } } http://git-wip-us.apache.org/repos/asf/hive/blob/74ac99fa/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index ad012b6..616807c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -17,23 +17,19 @@ */ package org.apache.hadoop.hive.ql.exec.spark.session; -import com.google.common.base.Preconditions; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hive.spark.client.SparkClientFactory; - import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hive.spark.client.SparkClientFactory; /** * Simple implementation of SparkSessionManager @@ -44,8 +40,8 @@ import java.util.concurrent.atomic.AtomicBoolean; public class SparkSessionManagerImpl implements SparkSessionManager { private static final Log LOG = LogFactory.getLog(SparkSessionManagerImpl.class); - private Set createdSessions; - private AtomicBoolean inited = new AtomicBoolean(false); + private Set createdSessions = Collections.synchronizedSet(new HashSet()); + private volatile boolean inited = false; private static SparkSessionManagerImpl instance; @@ -78,14 +74,18 @@ public class SparkSessionManagerImpl implements SparkSessionManager { @Override public void setup(HiveConf hiveConf) throws HiveException { - if (inited.compareAndSet(false, true)) { - LOG.info("Setting up the session manager."); - createdSessions = Collections.synchronizedSet(new HashSet()); - Map conf = HiveSparkClientFactory.initiateSparkConf(hiveConf); - try { - SparkClientFactory.initialize(conf); - } catch (IOException e) { - throw new HiveException("Error initializing SparkClientFactory", e); + if (!inited) { + synchronized (this) { + if (!inited) { + LOG.info("Setting up the session manager."); + Map conf = HiveSparkClientFactory.initiateSparkConf(hiveConf); + try { + SparkClientFactory.initialize(conf); + inited = true; + } catch (IOException e) { + throw new HiveException("Error initializing SparkClientFactory", e); + } + } } } } @@ -104,14 +104,12 @@ public class SparkSessionManagerImpl implements SparkSessionManager { if (existingSession != null) { // Open the session if it is closed. if (!existingSession.isOpen() && doOpen) { - existingSession.open(conf); + existingSession.open(conf); } return existingSession; } SparkSession sparkSession = new SparkSessionImpl(); - createdSessions.add(sparkSession); - if (doOpen) { sparkSession.open(conf); } @@ -119,6 +117,7 @@ public class SparkSessionManagerImpl implements SparkSessionManager { if (LOG.isDebugEnabled()) { LOG.debug(String.format("New session (%s) is created.", sparkSession.getSessionId())); } + createdSessions.add(sparkSession); return sparkSession; } @@ -144,17 +143,15 @@ public class SparkSessionManagerImpl implements SparkSessionManager { @Override public void shutdown() { LOG.info("Closing the session manager."); - if (createdSessions != null) { - synchronized (createdSessions) { - Iterator it = createdSessions.iterator(); - while (it.hasNext()) { - SparkSession session = it.next(); - session.close(); - } - createdSessions.clear(); + synchronized (createdSessions) { + Iterator it = createdSessions.iterator(); + while (it.hasNext()) { + SparkSession session = it.next(); + session.close(); } + createdSessions.clear(); } - inited.set(false); + inited = false; SparkClientFactory.stop(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/74ac99fa/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 1bcd221..9e34a49 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -97,7 +97,7 @@ class SparkClientImpl implements SparkClient { try { // The RPC server will take care of timeouts here. this.driverRpc = rpcServer.registerClient(clientId, secret, protocol).get(); - } catch (Exception e) { + } catch (Throwable e) { LOG.warn("Error while waiting for client to connect.", e); driverThread.interrupt(); try {