hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stak...@apache.org
Subject hive git commit: HIVE-19008: Improve Spark session id logging (Sahil Takiar, reviewed by Aihua Xu)
Date Mon, 06 Aug 2018 09:34:20 GMT
Repository: hive
Updated Branches:
  refs/heads/master 7cf791472 -> 7795c0a7d


HIVE-19008: Improve Spark session id logging (Sahil Takiar, reviewed by Aihua Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7795c0a7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7795c0a7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7795c0a7

Branch: refs/heads/master
Commit: 7795c0a7dc59941671f8845d78b16d9e5ddc9ea3
Parents: 7cf7914
Author: Sahil Takiar <takiar.sahil@gmail.com>
Authored: Sun Apr 1 21:21:43 2018 -0500
Committer: Sahil Takiar <stakiar@cloudera.com>
Committed: Mon Aug 6 11:34:00 2018 +0200

----------------------------------------------------------------------
 .../ql/exec/spark/HiveSparkClientFactory.java   | 13 +++---
 .../ql/exec/spark/session/SparkSessionImpl.java | 11 ++---
 .../spark/session/SparkSessionManagerImpl.java  |  3 +-
 .../hadoop/hive/ql/session/SessionState.java    |  6 +++
 .../session/TestSparkSessionManagerImpl.java    | 43 ++++++++++++++++++--
 5 files changed, 58 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7795c0a7/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 5ed5d42..0aae0d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -62,8 +62,9 @@ public class HiveSparkClientFactory {
   @VisibleForTesting
   public static final String SPARK_CLONE_CONFIGURATION = "spark.hadoop.cloneConf";
 
-  public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sessionId)
throws Exception {
-    Map<String, String> sparkConf = initiateSparkConf(hiveconf, sessionId);
+  public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sparkSessionId,
+                                                      String hiveSessionId) throws Exception
{
+    Map<String, String> sparkConf = initiateSparkConf(hiveconf, hiveSessionId);
 
     // Submit spark job through local spark context while spark master is local mode, otherwise
submit
     // spark job through remote spark context.
@@ -72,11 +73,11 @@ public class HiveSparkClientFactory {
       // With local spark context, all user sessions share the same spark context.
       return LocalHiveSparkClient.getInstance(generateSparkConf(sparkConf), hiveconf);
     } else {
-      return new RemoteHiveSparkClient(hiveconf, sparkConf, sessionId);
+      return new RemoteHiveSparkClient(hiveconf, sparkConf, hiveSessionId + "_" + sparkSessionId);
     }
   }
 
-  public static Map<String, String> initiateSparkConf(HiveConf hiveConf, String sessionId)
{
+  public static Map<String, String> initiateSparkConf(HiveConf hiveConf, String hiveSessionId)
{
     Map<String, String> sparkConf = new HashMap<String, String>();
     HBaseConfiguration.addHbaseResources(hiveConf);
 
@@ -84,9 +85,9 @@ public class HiveSparkClientFactory {
     sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
     final String appNameKey = "spark.app.name";
     String appName = hiveConf.get(appNameKey);
-    final String sessionIdString = " (sessionId = " + sessionId + ")";
+    final String sessionIdString = " (hiveSessionId = " + hiveSessionId + ")";
     if (appName == null) {
-      if (sessionId == null) {
+      if (hiveSessionId == null) {
         appName = SPARK_DEFAULT_APP_NAME;
       } else {
         appName = SPARK_DEFAULT_APP_NAME + sessionIdString;

http://git-wip-us.apache.org/repos/asf/hive/blob/7795c0a7/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index 0f2f031..2015810 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -72,8 +72,8 @@ public class SparkSessionImpl implements SparkSession {
   private Path scratchDir;
   private final Object dirLock = new Object();
 
-  public SparkSessionImpl() {
-    sessionId = makeSessionId();
+  SparkSessionImpl(String sessionId) {
+    this.sessionId = sessionId;
     initErrorPatterns();
   }
 
@@ -83,7 +83,8 @@ public class SparkSessionImpl implements SparkSession {
     this.conf = conf;
     isOpen = true;
     try {
-      hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId);
+      hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId,
+              SessionState.get().getSessionId());
     } catch (Throwable e) {
       // It's possible that user session is closed while creating Spark client.
       HiveException he;
@@ -260,10 +261,6 @@ public class SparkSessionImpl implements SparkSession {
     return scratchDir;
   }
 
-  public static String makeSessionId() {
-    return UUID.randomUUID().toString();
-  }
-
   @VisibleForTesting
   HiveSparkClient getHiveSparkClient() {
     return hiveSparkClient;

http://git-wip-us.apache.org/repos/asf/hive/blob/7795c0a7/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
index 46cee0d..68c9e04 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,7 +111,7 @@ public class SparkSessionManagerImpl implements SparkSessionManager {
       return existingSession;
     }
 
-    SparkSession sparkSession = new SparkSessionImpl();
+    SparkSession sparkSession = new SparkSessionImpl(SessionState.get().getNewSparkSessionId());
     if (doOpen) {
       sparkSession.open(conf);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/7795c0a7/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 262bbb9..71e130b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -43,6 +43,7 @@ import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.lang.StringUtils;
@@ -315,6 +316,8 @@ public class SessionState {
 
   private List<Closeable> cleanupItems = new LinkedList<Closeable>();
 
+  private final AtomicLong sparkSessionId = new AtomicLong();
+
   public HiveConf getConf() {
     return sessionConf;
   }
@@ -2059,6 +2062,9 @@ public class SessionState {
     return currentFunctionsInUse;
   }
 
+  public String getNewSparkSessionId() {
+    return Long.toString(this.sparkSessionId.getAndIncrement());
+  }
 }
 
 class ResourceMaps {

http://git-wip-us.apache.org/repos/asf/hive/blob/7795c0a7/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
index 6964764..853e4f4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.session;
 
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.util.StringUtils;
+
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
@@ -34,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
 import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.spark.SparkConf;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -41,10 +46,17 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class TestSparkSessionManagerImpl {
+
   private static final Logger LOG = LoggerFactory.getLogger(TestSparkSessionManagerImpl.class);
 
   private SparkSessionManagerImpl sessionManagerHS2 = null;
   private boolean anyFailedSessionThread; // updated only when a thread has failed.
+  private static HiveConf SESSION_HIVE_CONF = new HiveConf();
+
+  @BeforeClass
+  public static void setup() {
+    SessionState.start(SESSION_HIVE_CONF);
+  }
 
 
   /** Tests CLI scenario where we get a single session and use it multiple times. */
@@ -82,7 +94,7 @@ public class TestSparkSessionManagerImpl {
 
     List<Thread> threadList = new ArrayList<Thread>();
     for (int i = 0; i < 10; i++) {
-      Thread t = new Thread(new SessionThread(), "Session thread " + i);
+      Thread t = new Thread(new SessionThread(SessionState.get()), "Session thread " + i);
       t.start();
       threadList.add(t);
     }
@@ -185,6 +197,23 @@ public class TestSparkSessionManagerImpl {
         "java.lang.NoClassDefFoundError: org/apache/spark/SparkConf");
   }
 
+  @Test
+  public void testGetSessionId() throws HiveException {
+    SessionState ss = SessionState.start(SESSION_HIVE_CONF);
+    SparkSessionManager ssm = SparkSessionManagerImpl.getInstance();
+
+    ss.setSparkSession(ssm.getSession(null, SESSION_HIVE_CONF, true));
+    assertEquals("0", ss.getSparkSession().getSessionId());
+
+    ss.setSparkSession(ssm.getSession(null, SESSION_HIVE_CONF, true));
+    assertEquals("1", ss.getSparkSession().getSessionId());
+
+    ss = SessionState.start(SESSION_HIVE_CONF);
+
+    ss.setSparkSession(ssm.getSession(null, SESSION_HIVE_CONF, true));
+    assertEquals("0", ss.getSparkSession().getSessionId());
+  }
+
   private void checkHiveException(SparkSessionImpl ss, Throwable e, ErrorMsg expectedErrMsg)
{
     checkHiveException(ss, e, expectedErrMsg, null);
   }
@@ -220,10 +249,16 @@ public class TestSparkSessionManagerImpl {
   /* Thread simulating a user session in HiveServer2. */
   public class SessionThread implements Runnable {
 
+    private final SessionState ss;
+
+    private SessionThread(SessionState ss) {
+      this.ss = ss;
+    }
 
     @Override
     public void run() {
       try {
+        SessionState.setCurrentSessionState(ss);
         Random random = new Random(Thread.currentThread().getId());
         String threadName = Thread.currentThread().getName();
         System.out.println(threadName + " started.");


Mime
View raw message