hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [33/50] [abbrv] hive git commit: HIVE-13966: DbNotificationListener: can loose DDL operation notifications (Mohit Sabharwal reviewed by Chaoyu Tang)
Date Tue, 15 Nov 2016 20:21:05 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/3918a639/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 2a604bf..c0ef25e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -18,20 +18,14 @@
 
 package org.apache.hadoop.hive.metastore;
 
-import java.io.FileInputStream;
-
-import java.io.File;
-
+import org.apache.hadoop.hive.metastore.api.ClientCapabilities;
+import org.apache.hadoop.hive.metastore.api.ClientCapability;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableResult;
+import org.apache.hadoop.hive.metastore.api.GetTablesRequest;
+import org.apache.hadoop.hive.metastore.api.GetTablesResult;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 
-import java.io.FileNotFoundException;
-
-import org.apache.thrift.transport.TIOStreamTransport;
-
-import java.io.FileOutputStream;
-
-import java.io.BufferedOutputStream;
-
 import com.facebook.fb303.FacebookBase;
 import com.facebook.fb303.fb_status;
 import com.google.common.annotations.VisibleForTesting;
@@ -41,8 +35,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimaps;
-
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -50,7 +44,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.JvmPauseMonitor;
 import org.apache.hadoop.hive.common.LogUtils;
-import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
 import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
@@ -63,8 +56,119 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AddForeignKeyRequest;
+import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.AddPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.AddPrimaryKeyRequest;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.CacheFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.CacheFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.ClearFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.ClearFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DropConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventResponse;
+import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
+import org.apache.hadoop.hive.metastore.api.ForeignKeysResponse;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprResult;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest;
+import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse;
+import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
+import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse;
+import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeRequest;
+import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeResponse;
+import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleRequest;
+import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
+import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionEventType;
+import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD;
+import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsStatsResult;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysResponse;
+import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
+import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo;
+import org.apache.hadoop.hive.metastore.api.PutFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.PutFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
+import org.apache.hadoop.hive.metastore.api.Role;
+import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.api.TableStatsRequest;
+import org.apache.hadoop.hive.metastore.api.TableStatsResult;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.Type;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
 import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
@@ -107,8 +211,8 @@ import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
 import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
-import org.apache.hadoop.hive.thrift.TUGIContainingTransport;
 import org.apache.hadoop.hive.thrift.HiveDelegationTokenManager;
+import org.apache.hadoop.hive.thrift.TUGIContainingTransport;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -132,11 +236,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jdo.JDOException;
-
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.text.DateFormat;
@@ -158,8 +258,8 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -406,6 +506,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     private AlterHandler alterHandler;
     private List<MetaStorePreEventListener> preListeners;
     private List<MetaStoreEventListener> listeners;
+    private List<MetaStoreEventListener> transactionalListeners;
     private List<MetaStoreEndFunctionListener> endFunctionListeners;
     private List<MetaStoreInitListener> initListeners;
     private Pattern partitionValidationPattern;
@@ -418,6 +519,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       }
     }
 
+    List<MetaStoreEventListener> getTransactionalListeners() {
+      return transactionalListeners;
+    }
+
     @Override
     public void init() throws MetaException {
       rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
@@ -489,7 +594,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS));
       listeners.add(new SessionPropertiesListener(hiveConf));
       listeners.add(new AcidEventListener(hiveConf));
-
+      transactionalListeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class,hiveConf,
+              hiveConf.getVar(ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS));
       if (metrics != null) {
         listeners.add(new HMSMetricsListener(hiveConf, metrics));
       }
@@ -562,6 +668,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       for (MetaStoreEventListener listener : listeners) {
         listener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, value));
       }
+
+      if (transactionalListeners.size() > 0) {
+        // All the fields of this event are final, so no reason to create a new one for each
+        // listener
+        ConfigChangeEvent cce = new ConfigChangeEvent(this, key, oldValue, value);
+        for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+          transactionalListener.onConfigChange(cce);
+        }
+      }
     }
 
     @Override
@@ -858,19 +973,18 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       if (!validateName(db.getName(), null)) {
         throw new InvalidObjectException(db.getName() + " is not a valid database name");
       }
+
       if (null == db.getLocationUri()) {
         db.setLocationUri(wh.getDefaultDatabasePath(db.getName()).toString());
       } else {
         db.setLocationUri(wh.getDnsPath(new Path(db.getLocationUri())).toString());
       }
+
       Path dbPath = new Path(db.getLocationUri());
       boolean success = false;
       boolean madeDir = false;
-
       try {
-
         firePreEvent(new PreCreateDatabaseEvent(db, this));
-
         if (!wh.isDir(dbPath)) {
           if (!wh.mkdirs(dbPath, true)) {
             throw new MetaException("Unable to create database path " + dbPath +
@@ -881,6 +995,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         ms.openTransaction();
         ms.createDatabase(db);
+        if (transactionalListeners.size() > 0) {
+          CreateDatabaseEvent cde = new CreateDatabaseEvent(db, true, this);
+          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+            transactionalListener.onCreateDatabase(cde);
+          }
+        }
+
         success = ms.commitTransaction();
       } finally {
         if (!success) {
@@ -918,7 +1039,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
           Deadline.checkTimeout();
         }
-
         create_database_core(getMS(), db);
         success = true;
       } catch (Exception e) {
@@ -1088,6 +1208,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         if (ms.dropDatabase(name)) {
+          if (transactionalListeners.size() > 0) {
+            DropDatabaseEvent dde = new DropDatabaseEvent(db, true, this);
+            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+              transactionalListener.onDropDatabase(dde);
+            }
+          }
           success = ms.commitTransaction();
         }
       } finally {
@@ -1337,10 +1463,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     }
 
     private void create_table_core(final RawStore ms, final Table tbl,
-        final EnvironmentContext envContext, List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey>
foreignKeys)
+        final EnvironmentContext envContext, List<SQLPrimaryKey> primaryKeys,
+        List<SQLForeignKey> foreignKeys)
         throws AlreadyExistsException, MetaException,
         InvalidObjectException, NoSuchObjectException {
-
       if (!MetaStoreUtils.validateName(tbl.getTableName(), hiveConf)) {
         throw new InvalidObjectException(tbl.getTableName()
             + " is not a valid object name");
@@ -1427,8 +1553,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         } else {
           ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys);
         }
-        success = ms.commitTransaction();
 
+        if (transactionalListeners.size() > 0) {
+          CreateTableEvent createTableEvent = new CreateTableEvent(tbl, true, this);
+          createTableEvent.setEnvironmentContext(envContext);
+          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+            transactionalListener.onCreateTable(createTableEvent);
+          }
+        }
+
+        success = ms.commitTransaction();
       } finally {
         if (!success) {
           ms.rollbackTransaction();
@@ -1653,13 +1787,20 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         // Drop the partitions and get a list of locations which need to be deleted
         partPaths = dropPartitionsAndGetLocations(ms, dbname, name, tblPath,
             tbl.getPartitionKeys(), deleteData && !isExternal);
-
         if (!ms.dropTable(dbname, name)) {
           String tableName = dbname + "." + name;
           throw new MetaException(indexName == null ? "Unable to drop table " + tableName:
               "Unable to drop index table " + tableName + " for index " + indexName);
+        } else {
+          if (transactionalListeners.size() > 0) {
+            DropTableEvent dropTableEvent = new DropTableEvent(tbl, true, deleteData, this);
+            dropTableEvent.setEnvironmentContext(envContext);
+            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+              transactionalListener.onDropTable(dropTableEvent);
+            }
+          }
+          success = ms.commitTransaction();
         }
-        success = ms.commitTransaction();
       } finally {
         if (!success) {
           ms.rollbackTransaction();
@@ -2021,7 +2162,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           req.getDbName(), req.getTblNames(), req.getCapabilities()));
     }
 
-
     private List<Table> getTableObjectsInternal(
         String dbName, List<String> tableNames, ClientCapabilities capabilities)
             throws MetaException, InvalidOperationException, UnknownDBException {
@@ -2194,8 +2334,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           MetaStoreUtils.updatePartitionStatsFast(part, wh, madeDir, envContext);
         }
 
-        success = ms.addPartition(part);
-        if (success) {
+        if (ms.addPartition(part)) {
+          if (transactionalListeners.size() > 0) {
+            AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tbl, part, true,
this);
+            addPartitionEvent.setEnvironmentContext(envContext);
+            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+              transactionalListener.onAddPartition(addPartitionEvent);
+            }
+          }
+
           success = ms.commitTransaction();
         }
       } finally {
@@ -2348,13 +2495,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
     private List<Partition> add_partitions_core(final RawStore ms,
         String dbName, String tblName, List<Partition> parts, final boolean ifNotExists)
-            throws MetaException, InvalidObjectException, AlreadyExistsException, TException
{
+        throws MetaException, InvalidObjectException, AlreadyExistsException, TException
{
       logInfo("add_partitions");
       boolean success = false;
       // Ensures that the list doesn't have dups, and keeps track of directories we have
created.
       final Map<PartValEqWrapper, Boolean> addedPartitions =
           Collections.synchronizedMap(new HashMap<PartValEqWrapper, Boolean>());
-      final List<Partition> result = new ArrayList<Partition>();
+      final List<Partition> newParts = new ArrayList<Partition>();
       final List<Partition> existingParts = new ArrayList<Partition>();;
       Table tbl = null;
       try {
@@ -2370,7 +2517,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         List<Future<Partition>> partFutures = Lists.newArrayList();
-
         final Table table = tbl;
         for (final Partition part : parts) {
           if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) {
@@ -2391,6 +2537,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           } catch (IOException e) {
             throw new RuntimeException(e);
           }
+
           partFutures.add(threadPool.submit(new Callable() {
             @Override
             public Partition call() throws Exception {
@@ -2416,11 +2563,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             }
           }));
         }
+
         try {
           for (Future<Partition> partFuture : partFutures) {
             Partition part = partFuture.get();
             if (part != null) {
-              result.add(part);
+              newParts.add(part);
             }
           }
         } catch (InterruptedException | ExecutionException e) {
@@ -2430,12 +2578,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
           throw new MetaException(e.getMessage());
         }
-        if (!result.isEmpty()) {
-          success = ms.addPartitions(dbName, tblName, result);
+
+        if (!newParts.isEmpty()) {
+          success = ms.addPartitions(dbName, tblName, newParts);
         } else {
           success = true;
         }
-        success = success && ms.commitTransaction();
+
+        // Setting success to false to make sure that if the listener fails, rollback happens.
+        success = false;
+        // Notification is generated for newly created partitions only. The subset of partitions
+        // that already exist (existingParts), will not generate notifications.
+        fireMetaStoreAddPartitionEventTransactional(tbl, newParts, null, true);
+        success = ms.commitTransaction();
       } finally {
         if (!success) {
           ms.rollbackTransaction();
@@ -2447,14 +2602,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
           fireMetaStoreAddPartitionEvent(tbl, parts, null, false);
         } else {
-          fireMetaStoreAddPartitionEvent(tbl, result, null, true);
+          fireMetaStoreAddPartitionEvent(tbl, newParts, null, true);
           if (existingParts != null) {
             // The request has succeeded but we failed to add these partitions.
             fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false);
           }
         }
       }
-      return result;
+      return newParts;
     }
 
     @Override
@@ -2546,18 +2701,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         firePreEvent(new PreAddPartitionEvent(tbl, partitionSpecProxy, this));
-
         List<Future<Partition>> partFutures = Lists.newArrayList();
         final Table table = tbl;
-
         while(partitionIterator.hasNext()) {
-
           final Partition part = partitionIterator.getCurrent();
 
           if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) {
             throw new MetaException("Partition does not belong to target table "
                 + dbName + "." + tblName + ": " + part);
           }
+
           boolean shouldAdd = startAddPartition(ms, part, ifNotExists);
           if (!shouldAdd) {
             LOG.info("Not adding partition " + part + " as it already exists");
@@ -2570,6 +2723,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           } catch (IOException e) {
             throw new RuntimeException(e);
           }
+
           partFutures.add(threadPool.submit(new Callable() {
             @Override public Object call() throws Exception {
               ugi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -2608,9 +2762,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           throw new MetaException(e.getMessage());
         }
 
-        success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists)
-            && ms.commitTransaction();
-
+        success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists);
+        //setting success to false to make sure that if the listener fails, rollback happens.
+        success = false;
+        fireMetaStoreAddPartitionEventTransactional(tbl, partitionSpecProxy, null, true);
+        success = ms.commitTransaction();
         return addedPartitions.size();
       } finally {
         if (!success) {
@@ -2748,9 +2904,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             wh.deleteDir(new Path(part.getSd().getLocation()), true);
           }
         }
+
+        // Setting success to false to make sure that if the listener fails, rollback happens.
+        success = false;
+        fireMetaStoreAddPartitionEventTransactional(tbl, Arrays.asList(part), envContext,
true);
         // we proceed only if we'd actually succeeded anyway, otherwise,
         // we'd have thrown an exception
-        success = success && ms.commitTransaction();
+        success = ms.commitTransaction();
       } finally {
         if (!success) {
           ms.rollbackTransaction();
@@ -2767,7 +2927,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         AddPartitionEvent addPartitionEvent =
             new AddPartitionEvent(tbl, parts, success, this);
         addPartitionEvent.setEnvironmentContext(envContext);
-
         for (MetaStoreEventListener listener : listeners) {
           listener.onAddPartition(addPartitionEvent);
         }
@@ -2781,13 +2940,39 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         AddPartitionEvent addPartitionEvent =
             new AddPartitionEvent(tbl, partitionSpec, success, this);
         addPartitionEvent.setEnvironmentContext(envContext);
-
         for (MetaStoreEventListener listener : listeners) {
           listener.onAddPartition(addPartitionEvent);
         }
       }
     }
 
+    private void fireMetaStoreAddPartitionEventTransactional(final Table tbl,
+          final List<Partition> parts, final EnvironmentContext envContext, boolean
success)
+            throws MetaException {
+      if (tbl != null && parts != null && !parts.isEmpty()) {
+        AddPartitionEvent addPartitionEvent =
+                new AddPartitionEvent(tbl, parts, success, this);
+        addPartitionEvent.setEnvironmentContext(envContext);
+        for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+          transactionalListener.onAddPartition(addPartitionEvent);
+        }
+      }
+    }
+
+    private void fireMetaStoreAddPartitionEventTransactional(final Table tbl,
+          final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean
success)
+            throws MetaException {
+      if (tbl != null && partitionSpec != null) {
+        AddPartitionEvent addPartitionEvent =
+                new AddPartitionEvent(tbl, partitionSpec, success, this);
+        addPartitionEvent.setEnvironmentContext(envContext);
+        for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+          transactionalListener.onAddPartition(addPartitionEvent);
+        }
+      }
+    }
+
+
     @Override
     public Partition add_partition(final Partition part)
         throws InvalidObjectException, AlreadyExistsException, MetaException {
@@ -2949,8 +3134,17 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         if (!ms.dropPartition(db_name, tbl_name, part_vals)) {
           throw new MetaException("Unable to drop partition");
+        } else {
+          if (transactionalListeners.size() > 0) {
+            DropPartitionEvent dropPartitionEvent =
+                new DropPartitionEvent(tbl, part, true, deleteData, this);
+            dropPartitionEvent.setEnvironmentContext(envContext);
+            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+              transactionalListener.onDropPartition(dropPartitionEvent);
+            }
+          }
+          success = ms.commitTransaction();
         }
-        success = ms.commitTransaction();
       } finally {
         if (!success) {
           ms.rollbackTransaction();
@@ -3124,11 +3318,23 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         ms.dropPartitions(dbName, tblName, partNames);
+        if (parts != null && transactionalListeners.size() > 0) {
+          for (Partition part : parts) {
+            DropPartitionEvent dropPartitionEvent =
+                new DropPartitionEvent(tbl, part, true, deleteData, this);
+            dropPartitionEvent.setEnvironmentContext(envContext);
+            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+              transactionalListener.onDropPartition(dropPartitionEvent);
+            }
+          }
+        }
+
         success = ms.commitTransaction();
         DropPartitionsResult result = new DropPartitionsResult();
         if (needResult) {
           result.setPartitions(parts);
         }
+
         return result;
       } finally {
         if (!success) {
@@ -3536,8 +3742,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     @Override
     public void alter_partition(final String db_name, final String tbl_name,
         final Partition new_part)
-        throws InvalidOperationException, MetaException,
-        TException {
+        throws InvalidOperationException, MetaException, TException {
       rename_partition(db_name, tbl_name, null, new_part);
     }
 
@@ -3561,8 +3766,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     private void rename_partition(final String db_name, final String tbl_name,
         final List<String> part_vals, final Partition new_part,
         final EnvironmentContext envContext)
-        throws InvalidOperationException, MetaException,
-        TException {
+        throws InvalidOperationException, MetaException, TException {
       startTableFunction("alter_partition", db_name, tbl_name);
 
       if (LOG.isInfoEnabled()) {
@@ -3585,13 +3789,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Exception ex = null;
       try {
         firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, part_vals, new_part, this));
-
         if (part_vals != null && !part_vals.isEmpty()) {
           MetaStoreUtils.validatePartitionNameCharacters(new_part.getValues(),
               partitionValidationPattern);
         }
 
-        oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals,
new_part, envContext);
+        oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals,
new_part,
+                envContext, this);
 
         // Only fetch the table if we actually have a listener
         Table table = null;
@@ -3624,21 +3828,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       } finally {
         endFunction("alter_partition", oldPart != null, ex, tbl_name);
       }
-      return;
     }
 
     @Override
     public void alter_partitions(final String db_name, final String tbl_name,
-        final List<Partition> new_parts) throws InvalidOperationException, MetaException,
-        TException {
+        final List<Partition> new_parts)
+        throws InvalidOperationException, MetaException, TException {
       alter_partitions_with_environment_context(db_name, tbl_name, new_parts, null);
     }
 
     @Override
     public void alter_partitions_with_environment_context(final String db_name, final String
tbl_name,
         final List<Partition> new_parts, EnvironmentContext environmentContext)
-        throws InvalidOperationException, MetaException,
-        TException {
+        throws InvalidOperationException, MetaException, TException {
 
       startTableFunction("alter_partitions", db_name, tbl_name);
 
@@ -3655,8 +3857,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         for (Partition tmpPart : new_parts) {
           firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, null, tmpPart, this));
         }
-        oldParts = alterHandler.alterPartitions(getMS(), wh, db_name, tbl_name, new_parts,
environmentContext);
-
+        oldParts = alterHandler.alterPartitions(getMS(), wh, db_name, tbl_name, new_parts,
+                environmentContext, this);
         Iterator<Partition> olditr = oldParts.iterator();
         // Only fetch the table if we have a listener that needs it.
         Table table = null;
@@ -3697,7 +3899,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       } finally {
         endFunction("alter_partition", oldParts != null, ex, tbl_name);
       }
-      return;
     }
 
     @Override
@@ -3712,13 +3913,22 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       boolean success = false;
       Exception ex = null;
       Index oldIndex = null;
+      RawStore ms  = getMS();
       try {
+        ms.openTransaction();
         oldIndex = get_index_by_name(dbname, base_table_name, index_name);
 
         firePreEvent(new PreAlterIndexEvent(oldIndex, newIndex, this));
 
-        getMS().alterIndex(dbname, base_table_name, index_name, newIndex);
-        success = true;
+        ms.alterIndex(dbname, base_table_name, index_name, newIndex);
+        if (transactionalListeners.size() > 0) {
+          AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, true,
this);
+          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+            transactionalListener.onAlterIndex(alterIndexEvent);
+          }
+        }
+
+        success = ms.commitTransaction();
       } catch (InvalidObjectException e) {
         ex = e;
         throw new InvalidOperationException(e.getMessage());
@@ -3732,13 +3942,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           throw newMetaException(e);
         }
       } finally {
+        if (!success) {
+          ms.rollbackTransaction();
+        }
+
         endFunction("alter_index", success, ex, base_table_name);
         for (MetaStoreEventListener listener : listeners) {
           AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, success,
this);
           listener.onAlterIndex(alterIndexEvent);
         }
       }
-      return;
     }
 
     @Override
@@ -3801,9 +4014,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       try {
         Table oldt = get_table_core(dbname, name);
         firePreEvent(new PreAlterTableEvent(oldt, newTable, this));
-        alterHandler.alterTable(getMS(), wh, dbname, name, newTable, envContext);
+        alterHandler.alterTable(getMS(), wh, dbname, name, newTable,
+                envContext, this);
         success = true;
-
         for (MetaStoreEventListener listener : listeners) {
           AlterTableEvent alterTableEvent =
               new AlterTableEvent(oldt, newTable, success, this);
@@ -4416,7 +4629,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         index.setCreateTime((int) time);
         index.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
 
-        ms.addIndex(index);
+        if (ms.addIndex(index)) {
+          if (transactionalListeners.size() > 0) {
+            AddIndexEvent addIndexEvent = new AddIndexEvent(index, true, this);
+            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+              transactionalListener.onAddIndex(addIndexEvent);
+            }
+          }
+        }
+
         success = ms.commitTransaction();
         return index;
       } finally {
@@ -4429,6 +4650,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
           ms.rollbackTransaction();
         }
+
         for (MetaStoreEventListener listener : listeners) {
           AddIndexEvent addIndexEvent = new AddIndexEvent(index, success, this);
           listener.onAddIndex(addIndexEvent);
@@ -4506,6 +4728,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
                 + qualified[0] + "." + qualified[1] + " for index " + indexName);
           }
         }
+
+        if (transactionalListeners.size() > 0) {
+          DropIndexEvent dropIndexEvent = new DropIndexEvent(index, true, this);
+          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+            transactionalListener.onDropIndex(dropIndexEvent);
+          }
+        }
+
         success = ms.commitTransaction();
       } finally {
         if (!success) {
@@ -5809,17 +6039,28 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
       Table tbl = null;
       Exception ex = null;
+      RawStore ms  = getMS();
+      boolean success = false;
       try {
+        ms.openTransaction();
         startPartitionFunction("markPartitionForEvent", db_name, tbl_name, partName);
         firePreEvent(new PreLoadPartitionDoneEvent(db_name, tbl_name, partName, this));
-        tbl = getMS().markPartitionForEvent(db_name, tbl_name, partName, evtType);
+        tbl = ms.markPartitionForEvent(db_name, tbl_name, partName, evtType);
         if (null == tbl) {
           throw new UnknownTableException("Table: " + tbl_name + " not found.");
-        } else {
-          for (MetaStoreEventListener listener : listeners) {
-            listener.onLoadPartitionDone(new LoadPartitionDoneEvent(true, tbl, partName,
this));
+        }
+
+        if (transactionalListeners.size() > 0) {
+          LoadPartitionDoneEvent lpde = new LoadPartitionDoneEvent(true, tbl, partName, this);
+          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+            transactionalListener.onLoadPartitionDone(lpde);
           }
         }
+
+        success = ms.commitTransaction();
+        for (MetaStoreEventListener listener : listeners) {
+          listener.onLoadPartitionDone(new LoadPartitionDoneEvent(true, tbl, partName, this));
+        }
       } catch (Exception original) {
         ex = original;
         LOG.error("Exception caught in mark partition event ", original);
@@ -5839,7 +6080,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           throw newMetaException(original);
         }
       } finally {
-                endFunction("markPartitionForEvent", tbl != null, ex, tbl_name);
+        if (!success) {
+          ms.rollbackTransaction();
+        }
+
+        endFunction("markPartitionForEvent", tbl != null, ex, tbl_name);
       }
     }
 
@@ -6346,9 +6591,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           InsertEvent event = new InsertEvent(rqst.getDbName(), rqst.getTableName(),
               rqst.getPartitionVals(), rqst.getData().getInsertData().getFilesAdded(),
               rqst.isSuccessful(), this);
+          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+            transactionalListener.onInsert(event);
+          }
+
           for (MetaStoreEventListener listener : listeners) {
             listener.onInsert(event);
           }
+
           return new FireEventResponse();
 
         default:


Mime
View raw message