hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [13/17] hive git commit: HIVE-17643 : recent WM changes broke reopen due to spurious overloads (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Date Fri, 29 Sep 2017 21:01:54 GMT
HIVE-17643 : recent WM changes broke reopen due to spurious overloads (Sergey Shelukhin, reviewed
by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a860795c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a860795c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a860795c

Branch: refs/heads/hive-14535
Commit: a860795ce011f024b7b66bba4a000a28eb9fbc9d
Parents: 3a5565e
Author: sergey <sershe@apache.org>
Authored: Fri Sep 29 13:31:40 2017 -0700
Committer: sergey <sershe@apache.org>
Committed: Fri Sep 29 13:56:18 2017 -0700

----------------------------------------------------------------------
 .../hive/ql/exec/tez/TezSessionPoolManager.java  | 19 ++++---------------
 .../hive/ql/exec/tez/TezSessionPoolSession.java  |  4 ++--
 .../hadoop/hive/ql/exec/tez/TezSessionState.java |  4 ++--
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java  |  3 +++
 .../hadoop/hive/ql/exec/tez/WorkloadManager.java | 11 ++++-------
 .../hive/ql/exec/tez/TestTezSessionPool.java     | 16 ++++++++--------
 6 files changed, 23 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a860795c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 538d745..edcecb0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import org.apache.hadoop.hive.registry.impl.TezAmInstance;
-
 import java.util.HashSet;
 
 import java.util.concurrent.Semaphore;
@@ -321,7 +319,8 @@ public class TezSessionPoolManager
    *          the session to be closed
    * @throws Exception
    */
-  public void destroySession(TezSessionState tezSessionState) throws Exception {
+  @Override
+  public void destroy(TezSessionState tezSessionState) throws Exception {
     LOG.warn("We are closing a " + (tezSessionState.isDefault() ? "default" : "non-default")
         + " session because of retry failure.");
     tezSessionState.close(false);
@@ -398,7 +397,8 @@ public class TezSessionPoolManager
   }
 
   /** Reopens the session that was found to not be running. */
-  public TezSessionState reopenSession(TezSessionState sessionState,
+  @Override
+  public TezSessionState reopen(TezSessionState sessionState,
       Configuration conf, String[] additionalFiles) throws Exception {
     HiveConf sessionConf = sessionState.getConf();
     if (sessionState.getQueueName() != null
@@ -467,15 +467,4 @@ public class TezSessionPoolManager
   public SessionExpirationTracker getExpirationTracker() {
     return expirationTracker;
   }
-
-  @Override
-  public TezSessionPoolSession reopen(
-      TezSessionPoolSession session, Configuration conf, String[] inputOutputJars) {
-    return reopen(session, conf, inputOutputJars);
-  }
-
-  @Override
-  public void destroy(TezSessionPoolSession session) throws Exception {
-    destroySession(session);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a860795c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 4488c12..694f15b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -56,9 +56,9 @@ class TezSessionPoolSession extends TezSessionState {
     void registerOpenSession(TezSessionPoolSession session);
     void unregisterOpenSession(TezSessionPoolSession session);
     void returnAfterUse(TezSessionPoolSession session) throws Exception;
-    TezSessionState reopen(TezSessionPoolSession session, Configuration conf,
+    TezSessionState reopen(TezSessionState session, Configuration conf,
         String[] inputOutputJars) throws Exception;
-    void destroy(TezSessionPoolSession session) throws Exception;
+    void destroy(TezSessionState session) throws Exception;
   }
 
   private final AtomicInteger sessionState = new AtomicInteger(STATE_NONE);

http://git-wip-us.apache.org/repos/asf/hive/blob/a860795c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index e5850f9..363443d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -744,11 +744,11 @@ public class TezSessionState {
   public TezSessionState reopen(
       Configuration conf, String[] inputOutputJars) throws Exception {
     // By default, TezSessionPoolManager handles this for both pool and non-pool session.
-    return TezSessionPoolManager.getInstance().reopenSession(this, conf, inputOutputJars);
+    return TezSessionPoolManager.getInstance().reopen(this, conf, inputOutputJars);
   }
 
   public void destroy() throws Exception {
     // By default, TezSessionPoolManager handles this for both pool and non-pool session.
-    TezSessionPoolManager.getInstance().destroySession(this);
+    TezSessionPoolManager.getInstance().destroy(this);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a860795c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 29d6fe6..28d91cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -236,6 +236,9 @@ public class TezTask extends Task<TezWork> {
           counters = null;
         }
       } finally {
+        // Note: due to TEZ-3846, the session may actually be invalid in case of some errors.
+        //       Currently, reopen on an attempted reuse will take care of that; we cannot
tell
+        //       if the session is usable until we try.
         // We return this to the pool even if it's unusable; reopen is supposed to handle
this.
         try {
           session.returnToSessionManager();

http://git-wip-us.apache.org/repos/asf/hive/blob/a860795c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 288d705..3f62127 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -21,15 +21,12 @@ import java.util.concurrent.TimeoutException;
 
 import java.util.concurrent.TimeUnit;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -308,8 +305,8 @@ public class WorkloadManager
     sessions.replaceSession(ensureOwnedSession(oldSession), createSession(), false, null,
null);
   }
 
-  private WmTezSession ensureOwnedSession(TezSessionPoolSession oldSession) {
-    if (!(oldSession instanceof WmTezSession) || !oldSession.isOwnedBy(this)) {
+  private WmTezSession ensureOwnedSession(TezSessionState oldSession) {
+    if (!(oldSession instanceof WmTezSession) || !((WmTezSession)oldSession).isOwnedBy(this))
{
       throw new AssertionError("Not a WM session " + oldSession);
     }
     WmTezSession session = (WmTezSession) oldSession;
@@ -338,7 +335,7 @@ public class WorkloadManager
   }
 
   @Override
-  public TezSessionState reopen(TezSessionPoolSession session, Configuration conf,
+  public TezSessionState reopen(TezSessionState session, Configuration conf,
       String[] additionalFiles) throws Exception {
     WmTezSession oldSession = ensureOwnedSession(session), newSession = createSession();
     newSession.setPoolName(oldSession.getPoolName());
@@ -358,7 +355,7 @@ public class WorkloadManager
   }
 
   @Override
-  public void destroy(TezSessionPoolSession session) throws Exception {
+  public void destroy(TezSessionState session) throws Exception {
     LOG.warn("Closing a pool session because of retry failure.");
     // We never want to lose pool sessions. Replace it instead; al trigger duck redistribution.
     WmTezSession wmSession = ensureOwnedSession(session);

http://git-wip-us.apache.org/repos/asf/hive/blob/a860795c/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index b9f9f5e..05eb761 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -190,7 +190,7 @@ public class TestTezSessionPool {
       Mockito.when(session.isDefault()).thenReturn(false);
       Mockito.when(session.getConf()).thenReturn(conf);
 
-      poolManager.reopenSession(session, conf, null);
+      poolManager.reopen(session, conf, null);
 
       Mockito.verify(session).close(true);
       Mockito.verify(session).open(new HashSet<String>(), null);
@@ -200,12 +200,12 @@ public class TestTezSessionPool {
 
       // user explicitly specified queue name
       conf.set("tez.queue.name", "tezq1");
-      poolManager.reopenSession(session, conf, null);
+      poolManager.reopen(session, conf, null);
       assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName());
 
       // user unsets queue name, will fallback to default session queue
       conf.unset("tez.queue.name");
-      poolManager.reopenSession(session, conf, null);
+      poolManager.reopen(session, conf, null);
       assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName());
 
       // session.open will unset the queue name from conf but Mockito intercepts the open
call
@@ -213,17 +213,17 @@ public class TestTezSessionPool {
       conf.unset("tez.queue.name");
       // change session's default queue to tezq1 and rerun test sequence
       Mockito.when(session.getQueueName()).thenReturn("tezq1");
-      poolManager.reopenSession(session, conf, null);
+      poolManager.reopen(session, conf, null);
       assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName());
 
       // user sets default queue now
       conf.set("tez.queue.name", "default");
-      poolManager.reopenSession(session, conf, null);
+      poolManager.reopen(session, conf, null);
       assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName());
 
       // user does not specify queue so use session default
       conf.unset("tez.queue.name");
-      poolManager.reopenSession(session, conf, null);
+      poolManager.reopen(session, conf, null);
       assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName());
     } catch (Exception e) {
       e.printStackTrace();
@@ -328,7 +328,7 @@ public class TestTezSessionPool {
     Mockito.when(session.isDefault()).thenReturn(false);
     Mockito.when(session.getConf()).thenReturn(conf);
 
-    poolManager.reopenSession(session, conf, null);
+    poolManager.reopen(session, conf, null);
 
     Mockito.verify(session).close(true);
     Mockito.verify(session).open(new HashSet<String>(), null);
@@ -340,6 +340,6 @@ public class TestTezSessionPool {
     TezSessionState session = Mockito.mock(TezSessionState.class);
     Mockito.when(session.isDefault()).thenReturn(false);
 
-    poolManager.destroySession(session);
+    poolManager.destroy(session);
   }
 }


Mime
View raw message