hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vik...@apache.org
Subject hive git commit: HIVE-12797: Synchronization issues with tez/llap session pool in hs2 (Vikram Dixit K, reviewed by Sergey Shelukhin)
Date Fri, 22 Jan 2016 00:56:02 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-2.0 a8f86e16a -> 80f80b0e3


HIVE-12797: Synchronization issues with tez/llap session pool in hs2 (Vikram Dixit K, reviewed
by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/80f80b0e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/80f80b0e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/80f80b0e

Branch: refs/heads/branch-2.0
Commit: 80f80b0e3c5f82e0d2d8078593f0d2b98ae24e0d
Parents: a8f86e1
Author: vikram <vikram@hortonworks.com>
Authored: Thu Jan 21 16:31:45 2016 -0800
Committer: vikram <vikram@hortonworks.com>
Committed: Thu Jan 21 16:56:21 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/tez/TezJobMonitor.java  | 31 +++++++------
 .../hive/ql/exec/tez/TezSessionPoolManager.java | 46 +++++++++++++-------
 2 files changed, 48 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/80f80b0e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index 479bc93..c8d135e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -110,21 +110,18 @@ public class TezJobMonitor {
   private final NumberFormat secondsFormat;
   private final NumberFormat commaFormat;
   private static final List<DAGClient> shutdownList;
-  private Map<String, BaseWork> workMap;
+  private final Map<String, BaseWork> workMap;
 
   private StringBuffer diagnostics;
 
   static {
-    shutdownList = Collections.synchronizedList(new LinkedList<DAGClient>());
+    shutdownList = new LinkedList<DAGClient>();
     Runtime.getRuntime().addShutdownHook(new Thread() {
       @Override
       public void run() {
         TezJobMonitor.killRunningJobs();
         try {
-          for (TezSessionState s : TezSessionPoolManager.getInstance().getOpenSessions())
{
-            System.err.println("Shutting down tez session.");
-            TezSessionPoolManager.getInstance().closeIfNotDefault(s, false);
-          }
+          TezSessionPoolManager.getInstance().closeNonDefaultSessions(false);
         } catch (Exception e) {
           // ignore
         }
@@ -225,7 +222,9 @@ public class TezJobMonitor {
       Utilities.isPerfOrAboveLogging(conf);
 
     boolean inPlaceEligible = InPlaceUpdates.inPlaceEligible(conf);
-    shutdownList.add(dagClient);
+    synchronized(shutdownList) {
+      shutdownList.add(dagClient);
+    }
     console.printInfo("\n");
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
@@ -350,7 +349,9 @@ public class TezJobMonitor {
               diagnostics.append(diag);
             }
           }
-          shutdownList.remove(dagClient);
+          synchronized(shutdownList) {
+            shutdownList.remove(dagClient);
+          }
           break;
         }
       }
@@ -376,12 +377,14 @@ public class TezJobMonitor {
    * currently running tez queries. No guarantees, best effort only.
    */
   public static void killRunningJobs() {
-    for (DAGClient c: shutdownList) {
-      try {
-        System.err.println("Trying to shutdown DAG");
-        c.tryKillDAG();
-      } catch (Exception e) {
-        // ignore
+    synchronized (shutdownList) {
+      for (DAGClient c : shutdownList) {
+        try {
+          System.err.println("Trying to shutdown DAG");
+          c.tryKillDAG();
+        } catch (Exception e) {
+          // ignore
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/80f80b0e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 3bfe35a..1321b5f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -85,12 +85,9 @@ public class TezSessionPoolManager {
 
   private boolean inited = false;
 
-
-
   private static TezSessionPoolManager sessionPool = null;
 
-  private static List<TezSessionState> openSessions = Collections
-      .synchronizedList(new LinkedList<TezSessionState>());
+  private static List<TezSessionState> openSessions = new LinkedList<TezSessionState>();
 
   public static TezSessionPoolManager getInstance()
       throws Exception {
@@ -137,6 +134,7 @@ public class TezSessionPoolManager {
             + sessionLifetimeMs + " + [0, " + sessionLifetimeJitterMs + ") ms");
       }
       expirationQueue = new PriorityBlockingQueue<>(11, new Comparator<TezSessionPoolSession>()
{
+        @Override
         public int compare(TezSessionPoolSession o1, TezSessionPoolSession o2) {
           assert o1.expirationNs != null && o2.expirationNs != null;
           return o1.expirationNs.compareTo(o2.expirationNs);
@@ -144,11 +142,13 @@ public class TezSessionPoolManager {
       });
       restartQueue = new LinkedBlockingQueue<>();
       expirationThread = new Thread(new Runnable() {
+        @Override
         public void run() {
           runExpirationThread();
         }
       }, "TezSessionPool-expiration");
       restartThread = new Thread(new Runnable() {
+        @Override
         public void run() {
           runRestartThread();
         }
@@ -279,13 +279,15 @@ public class TezSessionPoolManager {
       return;
     }
 
-    // we can just stop all the sessions
-    Iterator<TezSessionState> iter = openSessions.iterator();
-    while (iter.hasNext()) {
-      TezSessionState sessionState = iter.next();
-      if (sessionState.isDefault()) {
-        sessionState.close(false);
-        iter.remove();
+    synchronized (openSessions) {
+      // we can just stop all the sessions
+      Iterator<TezSessionState> iter = openSessions.iterator();
+      while (iter.hasNext()) {
+        TezSessionState sessionState = iter.next();
+        if (sessionState.isDefault()) {
+          sessionState.close(false);
+          iter.remove();
+        }
       }
     }
 
@@ -402,8 +404,18 @@ public class TezSessionPoolManager {
     sessionState.open(conf, additionalFiles);
   }
 
-  public List<TezSessionState> getOpenSessions() {
-    return openSessions;
+  public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception {
+    synchronized (openSessions) {
+      Iterator<TezSessionState> iter = openSessions.iterator();
+      while (iter.hasNext()) {
+        System.err.println("Shutting down tez session.");
+        TezSessionState sessionState = iter.next();
+        closeIfNotDefault(sessionState, keepTmpDir);
+        if (sessionState.isDefault() == false) {
+          iter.remove();
+        }
+      }
+    }
   }
 
   private void closeAndReopen(TezSessionPoolSession oldSession) throws Exception {
@@ -522,7 +534,9 @@ public class TezSessionPoolManager {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Closed a pool session [" + this + "]");
         }
-        openSessions.remove(this);
+        synchronized (openSessions) {
+          openSessions.remove(this);
+        }
         if (parent.expirationQueue != null) {
           parent.expirationQueue.remove(this);
         }
@@ -534,7 +548,9 @@ public class TezSessionPoolManager {
         boolean isAsync, LogHelper console, Path scratchDir)
             throws IOException, LoginException, URISyntaxException, TezException {
       super.openInternal(conf, additionalFiles, isAsync, console, scratchDir);
-      openSessions.add(this);
+      synchronized (openSessions) {
+        openSessions.add(this);
+      }
       if (parent.expirationQueue != null) {
         long jitterModMs = (long)(parent.sessionLifetimeJitterMs * rdm.nextFloat());
         expirationNs = System.nanoTime() + (parent.sessionLifetimeMs + jitterModMs) * 1000000L;


Mime
View raw message