hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [1/2] hive git commit: HIVE-12445 : Tracking of completed dags is a slow memory leak (Sergey Shelukhin, reviewed by Siddharth Seth)
Date Sat, 12 Dec 2015 00:07:05 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-2.0 005eb6181 -> 728c18e92
  refs/heads/master fc19f6bf3 -> 747384b13


HIVE-12445 : Tracking of completed dags is a slow memory leak (Sergey Shelukhin, reviewed
by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/747384b1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/747384b1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/747384b1

Branch: refs/heads/master
Commit: 747384b13dcc0180d33c7adf0aa5aaf7720b1c41
Parents: fc19f6b
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Fri Dec 11 15:58:59 2015 -0800
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Fri Dec 11 15:58:59 2015 -0800

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/QueryFileCleaner.java |  96 ----------------
 .../hive/llap/daemon/impl/QueryTracker.java     | 114 +++++++++++++++----
 2 files changed, 93 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/747384b1/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
deleted file mode 100644
index def1f9b..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.daemon.impl;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tez.common.CallableWithNdc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class QueryFileCleaner extends AbstractService {
-
-  private static final Logger LOG = LoggerFactory.getLogger(QueryFileCleaner.class);
-
-  private final ListeningScheduledExecutorService executorService;
-  private final FileSystem localFs;
-
-
-  public QueryFileCleaner(Configuration conf, FileSystem localFs) {
-    super(QueryFileCleaner.class.getName());
-    int numCleanerThreads = HiveConf.getIntVar(
-        conf, ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS);
-    ScheduledExecutorService rawExecutor = Executors.newScheduledThreadPool(numCleanerThreads,
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryFileCleaner %d").build());
-    this.executorService = MoreExecutors.listeningDecorator(rawExecutor);
-    this.localFs = localFs;
-  }
-
-  public void serviceStart() {
-    LOG.info(getName() + " started");
-  }
-
-  @Override
-  public void serviceStop() {
-    executorService.shutdownNow();
-    LOG.info(getName() + " stopped");
-  }
-
-  public void cleanupDir(String dir, long deleteDelay) {
-    LOG.info("Scheduling deletion of {} after {} seconds", dir, deleteDelay);
-    executorService.schedule(new FileCleanerCallable(dir), deleteDelay, TimeUnit.SECONDS);
-  }
-
-  private class FileCleanerCallable extends CallableWithNdc<Void> {
-
-    private final String dirToDelete;
-
-    private FileCleanerCallable(String dirToDelete) {
-      this.dirToDelete = dirToDelete;
-    }
-
-    @Override
-    protected Void callInternal() {
-      Path pathToDelete = new Path(dirToDelete);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Deleting path: " + pathToDelete);
-      }
-      try {
-        localFs.delete(new Path(dirToDelete), true);
-      } catch (IOException e) {
-        LOG.warn("Ignoring exception while cleaning up path: " + pathToDelete, e);
-      }
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/747384b1/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 33d5671..6deaefc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -14,6 +14,14 @@
 
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.CallableWithNdc;
+
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -23,7 +31,6 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentS
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
 import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
-import org.apache.hadoop.service.CompositeService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,10 +50,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 /**
  * Tracks queries running within a daemon
  */
-public class QueryTracker extends CompositeService {
+public class QueryTracker extends AbstractService {
 
   private static final Logger LOG = LoggerFactory.getLogger(QueryTracker.class);
-  private final QueryFileCleaner queryFileCleaner;
+
+  private final ScheduledExecutorService executorService;
 
   // TODO Make use if the query id for cachin when this is available.
   private final ConcurrentHashMap<String, QueryInfo> queryInfoMap = new ConcurrentHashMap<>();
@@ -63,7 +71,8 @@ public class QueryTracker extends CompositeService {
   // Multiple threads communicating from a single AM gets in the way of this.
 
   // Keeps track of completed dags. Assumes dag names are unique across AMs.
-  private final Set<String> completedDagMap = Collections.newSetFromMap(new ConcurrentHashMap<String,
Boolean>());
+  private final Set<String> completedDagMap = Collections.newSetFromMap(
+      new ConcurrentHashMap<String, Boolean>());
 
 
   private final Lock lock = new ReentrantLock();
@@ -71,7 +80,8 @@ public class QueryTracker extends CompositeService {
 
   // Tracks various maps for dagCompletions. This is setup here since stateChange messages
   // may be processed by a thread which ends up executing before a task.
-  private final ConcurrentMap<String, ConcurrentMap<String, SourceStateProto>>
sourceCompletionMap = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, ConcurrentMap<String, SourceStateProto>>
+    sourceCompletionMap = new ConcurrentHashMap<>();
 
   // Tracks queryId by dagName. This can only be set when config is parsed in TezProcessor,
   // all the other existing code passes queryId equal to 0 everywhere.
@@ -89,12 +99,12 @@ public class QueryTracker extends CompositeService {
     this.defaultDeleteDelaySeconds = HiveConf.getTimeVar(
         conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
 
-    queryFileCleaner = new QueryFileCleaner(conf, localFs);
-    addService(queryFileCleaner);
+    int numCleanerThreads = HiveConf.getIntVar(
+        conf, ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS);
+    this.executorService = Executors.newScheduledThreadPool(numCleanerThreads,
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryFileCleaner %d").build());
   }
 
-
-
   /**
    * Register a new fragment for a specific query
    * @param queryId
@@ -107,10 +117,9 @@ public class QueryTracker extends CompositeService {
    * @param user
    * @throws IOException
    */
-  QueryFragmentInfo registerFragment(String queryId, String appIdString, String dagName,
int dagIdentifier,
-                        String vertexName, int fragmentNumber, int attemptNumber,
-                        String user, FragmentSpecProto fragmentSpec) throws
-      IOException {
+  QueryFragmentInfo registerFragment(String queryId, String appIdString, String dagName,
+      int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String
user,
+      FragmentSpecProto fragmentSpec) throws IOException {
     ReadWriteLock dagLock = getDagLock(dagName);
     dagLock.readLock().lock();
     try {
@@ -126,8 +135,8 @@ public class QueryTracker extends CompositeService {
         // Cleanup the dag lock here, since it may have been created after the query completed
         dagSpecificLocks.remove(dagName);
         throw new RuntimeException(
-            "Dag " + dagName + " already complete. Rejecting fragment [" + vertexName + ",
" + fragmentNumber +
-                ", " + attemptNumber);
+            "Dag " + dagName + " already complete. Rejecting fragment ["
+                + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]");
       }
     } finally {
       dagLock.readLock().unlock();
@@ -163,9 +172,9 @@ public class QueryTracker extends CompositeService {
     ReadWriteLock dagLock = getDagLock(dagName);
     dagLock.writeLock().lock();
     try {
-      completedDagMap.add(dagName);
-      LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds", dagName,
-          deleteDelay);
+      rememberCompletedDag(dagName);
+      LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds",
+          dagName, deleteDelay);
       QueryInfo queryInfo = queryInfoMap.remove(dagName);
       if (queryInfo == null) {
         LOG.warn("Ignoring query complete for unknown dag: {}", dagName);
@@ -174,14 +183,14 @@ public class QueryTracker extends CompositeService {
       String[] localDirs = queryInfo.getLocalDirsNoCreate();
       if (localDirs != null) {
         for (String localDir : localDirs) {
-          queryFileCleaner.cleanupDir(localDir, deleteDelay);
+          cleanupDir(localDir, deleteDelay);
           ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.getDagIdentifier());
         }
       }
       // Clearing this before sending a kill is OK, since canFinish will change to false.
       // Ideally this should be a state machine where kills are issued to the executor,
-      // and the structures are cleaned up once all tasks complete. New requests, however,
should not
-      // be allowed after a query complete is received.
+      // and the structures are cleaned up once all tasks complete. New requests, however,
+      // should not be allowed after a query complete is received.
       sourceCompletionMap.remove(dagName);
       String savedQueryId = dagNameToQueryId.remove(dagName);
       queryId = queryId == null ? savedQueryId : queryId;
@@ -195,6 +204,17 @@ public class QueryTracker extends CompositeService {
     }
   }
 
+
+
+  public void rememberCompletedDag(String dagName) {
+    if (completedDagMap.add(dagName)) {
+      // We will remember completed DAG for an hour to avoid execution out-of-order fragments.
+      executorService.schedule(new DagMapCleanerCallable(dagName), 1, TimeUnit.HOURS);
+    } else {
+      LOG.warn("Couldn't add {} to completed dag set", dagName);
+    }
+  }
+
   /**
    * Register an update to a source within an executing dag
    * @param dagName
@@ -242,4 +262,56 @@ public class QueryTracker extends CompositeService {
     if (queryId == null) return;
     dagNameToQueryId.putIfAbsent(dagName, queryId);
   }
+
+  @Override
+  public void serviceStart() {
+    LOG.info(getName() + " started");
+  }
+
+  @Override
+  public void serviceStop() {
+    executorService.shutdownNow();
+    LOG.info(getName() + " stopped");
+  }
+
+  private void cleanupDir(String dir, long deleteDelay) {
+    LOG.info("Scheduling deletion of {} after {} seconds", dir, deleteDelay);
+    executorService.schedule(new FileCleanerCallable(dir), deleteDelay, TimeUnit.SECONDS);
+  }
+
+  private class FileCleanerCallable extends CallableWithNdc<Void> {
+    private final String dirToDelete;
+
+    private FileCleanerCallable(String dirToDelete) {
+      this.dirToDelete = dirToDelete;
+    }
+
+    @Override
+    protected Void callInternal() {
+      Path pathToDelete = new Path(dirToDelete);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Deleting path: " + pathToDelete);
+      }
+      try {
+        localFs.delete(new Path(dirToDelete), true);
+      } catch (IOException e) {
+        LOG.warn("Ignoring exception while cleaning up path: " + pathToDelete, e);
+      }
+      return null;
+    }
+  }
+
+  private class DagMapCleanerCallable extends CallableWithNdc<Void> {
+    private final String dagName;
+
+    private DagMapCleanerCallable(String dagName) {
+      this.dagName = dagName;
+    }
+
+    @Override
+    protected Void callInternal() {
+      completedDagMap.remove(dagName);
+      return null;
+    }
+  }
 }


Mime
View raw message