hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject hive git commit: HIVE-13901 : Hivemetastore add partitions can be slow depending on filesystems (Rajesh Balamohan via Sergey Shelukhin)
Date Sat, 09 Jul 2016 15:27:00 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-2.1 0ba089b01 -> f361403f9


HIVE-13901 : Hivemetastore add partitions can be slow depending on filesystems (Rajesh Balamohan
via Sergey Shelukhin)

Signed-off-by: Ashutosh Chauhan <hashutosh@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f361403f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f361403f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f361403f

Branch: refs/heads/branch-2.1
Commit: f361403f93de533fe3818e7580c84a5f32e2079a
Parents: 0ba089b
Author: Rajesh Balamohan <rbalamohan at apache dot org>
Authored: Tue May 31 20:08:00 2016 -0800
Committer: Ashutosh Chauhan <hashutosh@apache.org>
Committed: Sat Jul 9 08:20:17 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../hadoop/hive/metastore/HiveMetaStore.java    | 135 +++++++++++++------
 .../test/results/clientnegative/external2.q.out |   2 +-
 3 files changed, 100 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f361403f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 88abb3f..e55d654 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -569,6 +569,8 @@ public class HiveConf extends Configuration {
         "Used to avoid all of the proxies and object copies in the metastore.  Note, if this
is " +
             "set, you MUST use a local metastore (hive.metastore.uris must be empty) otherwise
" +
             "undefined and most likely undesired behavior will result"),
+    METASTORE_FS_HANDLER_THREADS_COUNT("hive.metastore.fshandler.threads", 20,
+        "Number of threads to be allocated for metastore handler for fs operations."),
     METASTORE_HBASE_CATALOG_CACHE_SIZE("hive.metastore.hbase.catalog.cache.size", 50000,
"Maximum number of " +
         "objects we will place in the hbase metastore catalog cache.  The objects will be
divided up by " +
         "types that we need to cache."),

http://git-wip-us.apache.org/repos/asf/hive/blob/f361403f/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index c0827ea..bd2ea97 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimaps;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
 import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
@@ -131,10 +133,14 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Timer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
@@ -259,6 +265,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         };
 
+    private static ExecutorService threadPool;
+
     public static final String AUDIT_FORMAT =
         "ugi=%s\t" + // ugi
             "ip=%s\t" + // remote IP
@@ -352,6 +360,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     public HMSHandler(String name, HiveConf conf, boolean init) throws MetaException {
       super(name);
       hiveConf = conf;
+      synchronized (HMSHandler.class) {
+        if (threadPool == null) {
+          int numThreads = HiveConf.getIntVar(conf,
+              ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT);
+          threadPool = Executors.newFixedThreadPool(numThreads,
+              new ThreadFactoryBuilder().setDaemon(true)
+                  .setNameFormat("HMSHandler #%d").build());
+        }
+      }
       if (init) {
         init();
       }
@@ -2268,15 +2285,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       }
     }
 
-    private List<Partition> add_partitions_core(
-        RawStore ms, String dbName, String tblName, List<Partition> parts, boolean
ifNotExists)
+    private List<Partition> add_partitions_core(final RawStore ms,
+        String dbName, String tblName, List<Partition> parts, final boolean ifNotExists)
             throws MetaException, InvalidObjectException, AlreadyExistsException, TException
{
       logInfo("add_partitions");
       boolean success = false;
       // Ensures that the list doesn't have dups, and keeps track of directories we have
created.
-      Map<PartValEqWrapper, Boolean> addedPartitions = new HashMap<PartValEqWrapper,
Boolean>();
-      List<Partition> result = new ArrayList<Partition>();
-      List<Partition> existingParts = null;
+      final Map<PartValEqWrapper, Boolean> addedPartitions =
+          Collections.synchronizedMap(new HashMap<PartValEqWrapper, Boolean>());
+      final List<Partition> result = new ArrayList<Partition>();
+      final List<Partition> existingParts = new ArrayList<Partition>();;
       Table tbl = null;
       try {
         ms.openTransaction();
@@ -2290,29 +2308,51 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           firePreEvent(new PreAddPartitionEvent(tbl, parts, this));
         }
 
-        for (Partition part : parts) {
+        List<Future<Partition>> partFutures = Lists.newArrayList();
+
+        final Table table = tbl;
+        for (final Partition part : parts) {
           if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) {
             throw new MetaException("Partition does not belong to target table "
                 + dbName + "." + tblName + ": " + part);
           }
+
           boolean shouldAdd = startAddPartition(ms, part, ifNotExists);
           if (!shouldAdd) {
-            if (existingParts == null) {
-              existingParts = new ArrayList<Partition>();
-            }
             existingParts.add(part);
             LOG.info("Not adding partition " + part + " as it already exists");
             continue;
           }
-          boolean madeDir = createLocationForAddedPartition(tbl, part);
-          if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) {
-            // Technically, for ifNotExists case, we could insert one and discard the other
-            // because the first one now "exists", but it seems better to report the problem
-            // upstream as such a command doesn't make sense.
-            throw new MetaException("Duplicate partitions in the list: " + part);
+
+
+          partFutures.add(threadPool.submit(new Callable() {
+            @Override
+            public Partition call() throws Exception {
+              boolean madeDir = createLocationForAddedPartition(table, part);
+              if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) {
+                // Technically, for ifNotExists case, we could insert one and discard the
other
+                // because the first one now "exists", but it seems better to report the
problem
+                // upstream as such a command doesn't make sense.
+                throw new MetaException("Duplicate partitions in the list: " + part);
+              }
+              initializeAddedPartition(table, part, madeDir);
+              return part;
+            }
+          }));
+        }
+        try {
+          for (Future<Partition> partFuture : partFutures) {
+            Partition part = partFuture.get();
+            if (part != null) {
+              result.add(part);
+            }
           }
-          initializeAddedPartition(tbl, part, madeDir);
-          result.add(part);
+        } catch (InterruptedException | ExecutionException e) {
+          // cancel other tasks
+          for (Future<Partition> partFuture : partFutures) {
+            partFuture.cancel(true);
+          }
+          throw new MetaException(e.getMessage());
         }
         if (!result.isEmpty()) {
           success = ms.addPartitions(dbName, tblName, result);
@@ -2323,10 +2363,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       } finally {
         if (!success) {
           ms.rollbackTransaction();
-          for (Entry<PartValEqWrapper, Boolean> e : addedPartitions.entrySet()) {
+          for (Map.Entry<PartValEqWrapper, Boolean> e : addedPartitions.entrySet())
{
             if (e.getValue()) {
+              // we just created this directory - it's not a case of pre-creation, so we
nuke.
               wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true);
-              // we just created this directory - it's not a case of pre-creation, so we
nuke
             }
           }
           fireMetaStoreAddPartitionEvent(tbl, parts, null, false);
@@ -2415,9 +2455,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         throws TException {
       boolean success = false;
       // Ensures that the list doesn't have dups, and keeps track of directories we have
created.
-      Map<PartValEqWrapperLite, Boolean> addedPartitions = new HashMap<PartValEqWrapperLite,
Boolean>();
+      final Map<PartValEqWrapperLite, Boolean> addedPartitions =
+          Collections.synchronizedMap(new HashMap<PartValEqWrapperLite, Boolean>());
       PartitionSpecProxy partitionSpecProxy = PartitionSpecProxy.Factory.get(partSpecs);
-      PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy.getPartitionIterator();
+      final PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy
+          .getPartitionIterator();
       Table tbl = null;
       try {
         ms.openTransaction();
@@ -2429,10 +2471,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         firePreEvent(new PreAddPartitionEvent(tbl, partitionSpecProxy, this));
 
-        int nPartitions = 0;
+        List<Future<Partition>> partFutures = Lists.newArrayList();
+        final Table table = tbl;
+
         while(partitionIterator.hasNext()) {
 
-          Partition part = partitionIterator.getCurrent();
+          final Partition part = partitionIterator.getCurrent();
 
           if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) {
             throw new MetaException("Partition does not belong to target table "
@@ -2443,30 +2487,45 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             LOG.info("Not adding partition " + part + " as it already exists");
             continue;
           }
-          boolean madeDir = createLocationForAddedPartition(tbl, part);
-          if (addedPartitions.put(new PartValEqWrapperLite(part), madeDir) != null) {
-            // Technically, for ifNotExists case, we could insert one and discard the other
-            // because the first one now "exists", but it seems better to report the problem
-            // upstream as such a command doesn't make sense.
-            throw new MetaException("Duplicate partitions in the list: " + part);
-          }
-          initializeAddedPartition(tbl, partitionIterator, madeDir);
-
-          ++nPartitions;
+          partFutures.add(threadPool.submit(new Callable() {
+            @Override public Object call() throws Exception {
+              boolean madeDir = createLocationForAddedPartition(table, part);
+              if (addedPartitions.put(new PartValEqWrapperLite(part), madeDir) != null) {
+                // Technically, for ifNotExists case, we could insert one and discard the
other
+                // because the first one now "exists", but it seems better to report the
problem
+                // upstream as such a command doesn't make sense.
+                throw new MetaException("Duplicate partitions in the list: " + part);
+              }
+              initializeAddedPartition(table, part, madeDir);
+              return part;
+            }
+          }));
           partitionIterator.next();
         }
 
+        try {
+          for (Future<Partition> partFuture : partFutures) {
+            Partition part = partFuture.get();
+          }
+        } catch (InterruptedException | ExecutionException e) {
+          // cancel other tasks
+          for (Future<Partition> partFuture : partFutures) {
+            partFuture.cancel(true);
+          }
+          throw new MetaException(e.getMessage());
+        }
+
         success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists)
-               && ms.commitTransaction();
+            && ms.commitTransaction();
 
-        return nPartitions;
+        return addedPartitions.size();
       } finally {
         if (!success) {
           ms.rollbackTransaction();
-          for (Entry<PartValEqWrapperLite, Boolean> e : addedPartitions.entrySet())
{
+          for (Map.Entry<PartValEqWrapperLite, Boolean> e : addedPartitions.entrySet())
{
             if (e.getValue()) {
+              // we just created this directory - it's not a case of pre-creation, so we
nuke.
               wh.deleteDir(new Path(e.getKey().location), true);
-              // we just created this directory - it's not a case of pre-creation, so we
nuke
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/f361403f/ql/src/test/results/clientnegative/external2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/external2.q.out b/ql/src/test/results/clientnegative/external2.q.out
index 4fc7c11..91b5de4 100644
--- a/ql/src/test/results/clientnegative/external2.q.out
+++ b/ql/src/test/results/clientnegative/external2.q.out
@@ -10,4 +10,4 @@ POSTHOOK: Output: default@external2
 PREHOOK: type: ALTERTABLE_ADDPARTS
 #### A masked pattern was here ####
 PREHOOK: Output: default@external2
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Got
exception: java.io.IOException No FileSystem for scheme: invalidscheme)
+FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:MetaException(message:Got
exception: java.io.IOException No FileSystem for scheme: invalidscheme))


Mime
View raw message