asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [2/9] asterixdb git commit: [ASTERIXDB-1992][ING] Suspend/Resume active entities
Date Thu, 27 Jul 2017 06:35:56 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java
index 6c20e9f..48f315b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java
@@ -21,6 +21,8 @@ package org.apache.asterix.metadata.lock;
 import java.util.Objects;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.asterix.common.metadata.IMetadataLock;
+
 public class MetadataLock implements IMetadataLock {
     private final String key;
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -30,7 +32,7 @@ public class MetadataLock implements IMetadataLock {
     }
 
     @Override
-    public void acquire(IMetadataLock.Mode mode) {
+    public void lock(IMetadataLock.Mode mode) {
         switch (mode) {
             case WRITE:
                 lock.writeLock().lock();
@@ -42,7 +44,7 @@ public class MetadataLock implements IMetadataLock {
     }
 
     @Override
-    public void release(IMetadataLock.Mode mode) {
+    public void unlock(IMetadataLock.Mode mode) {
         switch (mode) {
             case WRITE:
                 lock.writeLock().unlock();
@@ -73,4 +75,9 @@ public class MetadataLock implements IMetadataLock {
         }
         return Objects.equals(key, ((MetadataLock) o).key);
     }
+
+    @Override
+    public String toString() {
+        return key;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
index 43d72e3..779fe2a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
@@ -18,17 +18,17 @@
  */
 package org.apache.asterix.metadata.lock;
 
-import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 
+import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.metadata.entities.FeedConnection;
-import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.common.metadata.IMetadataLock;
+import org.apache.asterix.common.metadata.LockList;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 
-public class MetadataLockManager {
+public class MetadataLockManager implements IMetadataLockManager {
 
-    public static final MetadataLockManager INSTANCE = new MetadataLockManager();
     private static final Function<String, MetadataLock> LOCK_FUNCTION = MetadataLock::new;
     private static final Function<String, DatasetLock> DATASET_LOCK_FUNCTION = DatasetLock::new;
 
@@ -38,286 +38,175 @@ public class MetadataLockManager {
     private static final String DATASET_PREFIX = "Dataset:";
     private static final String FUNCTION_PREFIX = "Function:";
     private static final String NODE_GROUP_PREFIX = "NodeGroup:";
-    private static final String FEED_PREFIX = "Feed:";
+    private static final String ACTIVE_PREFIX = "Active:";
     private static final String FEED_POLICY_PREFIX = "FeedPolicy:";
     private static final String MERGE_POLICY_PREFIX = "MergePolicy:";
     private static final String DATATYPE_PREFIX = "DataType:";
     private static final String EXTENSION_PREFIX = "Extension:";
 
-    private MetadataLockManager() {
+    public MetadataLockManager() {
         mdlocks = new ConcurrentHashMap<>();
     }
 
-    // Dataverse
+    @Override
     public void acquireDataverseReadLock(LockList locks, String dataverseName) throws AsterixException {
         String key = DATAVERSE_PREFIX + dataverseName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.READ, lock);
     }
 
+    @Override
     public void acquireDataverseWriteLock(LockList locks, String dataverseName) throws AsterixException {
         String key = DATAVERSE_PREFIX + dataverseName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.WRITE, lock);
     }
 
-    // Dataset
+    @Override
     public void acquireDatasetReadLock(LockList locks, String datasetName) throws AsterixException {
         String key = DATASET_PREFIX + datasetName;
         DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.READ, lock);
     }
 
+    @Override
     public void acquireDatasetWriteLock(LockList locks, String datasetName) throws AsterixException {
         String key = DATASET_PREFIX + datasetName;
         DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.WRITE, lock);
     }
 
+    @Override
     public void acquireDatasetModifyLock(LockList locks, String datasetName) throws AsterixException {
         String key = DATASET_PREFIX + datasetName;
         DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.MODIFY, lock);
     }
 
+    @Override
     public void acquireDatasetCreateIndexLock(LockList locks, String datasetName) throws AsterixException {
-        String key = DATASET_PREFIX + datasetName;
-        DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION);
+        String dsKey = DATASET_PREFIX + datasetName;
+        DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(dsKey, DATASET_LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.INDEX_BUILD, lock);
     }
 
-    public void acquireExternalDatasetRefreshLock(LockList locks, String datasetName) throws AsterixException {
+    @Override
+    public void acquireDatasetExclusiveModificationLock(LockList locks, String datasetName) throws AsterixException {
         String key = DATASET_PREFIX + datasetName;
         DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION);
-        locks.add(IMetadataLock.Mode.INDEX_BUILD, lock);
+        locks.add(IMetadataLock.Mode.EXCLUSIVE_MODIFY, lock);
     }
 
-    // Function
+    @Override
     public void acquireFunctionReadLock(LockList locks, String functionName) throws AsterixException {
         String key = FUNCTION_PREFIX + functionName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.READ, lock);
     }
 
+    @Override
     public void acquireFunctionWriteLock(LockList locks, String functionName) throws AsterixException {
         String key = FUNCTION_PREFIX + functionName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.WRITE, lock);
     }
 
-    // Node Group
+    @Override
     public void acquireNodeGroupReadLock(LockList locks, String nodeGroupName) throws AsterixException {
         String key = NODE_GROUP_PREFIX + nodeGroupName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.READ, lock);
     }
 
+    @Override
     public void acquireNodeGroupWriteLock(LockList locks, String nodeGroupName) throws AsterixException {
         String key = NODE_GROUP_PREFIX + nodeGroupName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.WRITE, lock);
     }
 
-    // Feeds
-    public void acquireFeedReadLock(LockList locks, String feedName) throws AsterixException {
-        String key = FEED_PREFIX + feedName;
+    @Override
+    public void acquireActiveEntityReadLock(LockList locks, String entityName) throws AsterixException {
+        String key = ACTIVE_PREFIX + entityName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.READ, lock);
     }
 
-    public void acquireFeedWriteLock(LockList locks, String feedName) throws AsterixException {
-        String key = FEED_PREFIX + feedName;
+    @Override
+    public void acquireActiveEntityWriteLock(LockList locks, String entityName) throws AsterixException {
+        String key = ACTIVE_PREFIX + entityName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.WRITE, lock);
     }
 
+    @Override
     public void acquireFeedPolicyWriteLock(LockList locks, String feedPolicyName) throws AsterixException {
         String key = FEED_POLICY_PREFIX + feedPolicyName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.WRITE, lock);
     }
 
+    @Override
     public void acquireFeedPolicyReadLock(LockList locks, String feedPolicyName) throws AsterixException {
         String key = FEED_POLICY_PREFIX + feedPolicyName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.READ, lock);
     }
 
-    // CompactionPolicy
+    @Override
     public void acquireMergePolicyReadLock(LockList locks, String mergePolicyName) throws AsterixException {
         String key = MERGE_POLICY_PREFIX + mergePolicyName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.READ, lock);
     }
 
+    @Override
     public void acquireMergePolicyWriteLock(LockList locks, String mergePolicyName) throws AsterixException {
         String key = MERGE_POLICY_PREFIX + mergePolicyName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.WRITE, lock);
     }
 
-    // DataType
+    @Override
     public void acquireDataTypeReadLock(LockList locks, String datatypeName) throws AsterixException {
         String key = DATATYPE_PREFIX + datatypeName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.READ, lock);
     }
 
+    @Override
     public void acquireDataTypeWriteLock(LockList locks, String datatypeName) throws AsterixException {
         String key = DATATYPE_PREFIX + datatypeName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.WRITE, lock);
     }
 
-    // Extensions
-    public void acquireExtensionReadLock(LockList locks, String extensionName) throws AsterixException {
-        String key = EXTENSION_PREFIX + extensionName;
+    @Override
+    public void acquireExtensionReadLock(LockList locks, String extension, String entityName) throws AsterixException {
+        String key = EXTENSION_PREFIX + extension + entityName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.READ, lock);
     }
 
-    public void acquireExtensionWriteLock(LockList locks, String extensionName) throws AsterixException {
-        String key = EXTENSION_PREFIX + extensionName;
+    @Override
+    public void acquireExtensionWriteLock(LockList locks, String extension, String entityName) throws AsterixException {
+        String key = EXTENSION_PREFIX + extension + entityName;
         IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION);
         locks.add(IMetadataLock.Mode.WRITE, lock);
     }
 
-    public void createDatasetBegin(LockList locks, String dataverseName, String itemTypeDataverseName,
-            String itemTypeFullyQualifiedName, String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName,
-            String nodeGroupName, String compactionPolicyName, String datasetFullyQualifiedName,
-            boolean isDefaultCompactionPolicy) throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        if (!dataverseName.equals(itemTypeDataverseName)) {
-            acquireDataverseReadLock(locks, itemTypeDataverseName);
-        }
-        if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName)
-                && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) {
-            acquireDataverseReadLock(locks, metaItemTypeDataverseName);
-        }
-        acquireDataTypeReadLock(locks, itemTypeFullyQualifiedName);
-        if (metaItemTypeFullyQualifiedName != null
-                && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) {
-            acquireDataTypeReadLock(locks, metaItemTypeFullyQualifiedName);
-        }
-        if (nodeGroupName != null) {
-            acquireNodeGroupReadLock(locks, nodeGroupName);
-        }
-        if (!isDefaultCompactionPolicy) {
-            acquireMergePolicyReadLock(locks, compactionPolicyName);
-        }
-        acquireDatasetWriteLock(locks, datasetFullyQualifiedName);
-    }
-
-    public void createIndexBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName)
-            throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireDatasetCreateIndexLock(locks, datasetFullyQualifiedName);
-    }
-
-    public void createTypeBegin(LockList locks, String dataverseName, String itemTypeFullyQualifiedName)
-            throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireDataTypeWriteLock(locks, itemTypeFullyQualifiedName);
-    }
-
-    public void dropDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName)
-            throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireDatasetWriteLock(locks, datasetFullyQualifiedName);
-    }
-
-    public void dropIndexBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName)
-            throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireDatasetWriteLock(locks, datasetFullyQualifiedName);
-    }
-
-    public void dropTypeBegin(LockList locks, String dataverseName, String dataTypeFullyQualifiedName)
-            throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireDataTypeWriteLock(locks, dataTypeFullyQualifiedName);
-    }
-
-    public void functionStatementBegin(LockList locks, String dataverseName, String functionFullyQualifiedName)
-            throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireFunctionWriteLock(locks, functionFullyQualifiedName);
-    }
-
-    public void modifyDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName)
-            throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireDatasetModifyLock(locks, datasetFullyQualifiedName);
-    }
-
-    public void insertDeleteUpsertBegin(LockList locks, String datasetFullyQualifiedName) throws AsterixException {
-        acquireDataverseReadLock(locks, DatasetUtil.getDataverseFromFullyQualifiedName(datasetFullyQualifiedName));
-        acquireDatasetModifyLock(locks, datasetFullyQualifiedName);
-    }
-
-    public void dropFeedBegin(LockList locks, String dataverseName, String feedFullyQualifiedName)
-            throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireFeedWriteLock(locks, feedFullyQualifiedName);
-    }
-
-    public void dropFeedPolicyBegin(LockList locks, String dataverseName, String policyName) throws AsterixException {
-        acquireFeedWriteLock(locks, policyName);
-        acquireDataverseReadLock(locks, dataverseName);
-    }
-
-    public void startFeedBegin(LockList locks, String dataverseName, String feedName,
-            List<FeedConnection> feedConnections) throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireFeedReadLock(locks, feedName);
-        for (FeedConnection feedConnection : feedConnections) {
-            // what if the dataset is in a different dataverse
-            String fqName = dataverseName + "." + feedConnection.getDatasetName();
-            acquireDatasetReadLock(locks, fqName);
-        }
-    }
-
-    public void stopFeedBegin(LockList locks, String dataverseName, String feedName) throws AsterixException {
-        // TODO: dataset lock?
-        // Dataset locks are not required here since datasets are protected by the active event listener
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireFeedReadLock(locks, feedName);
-    }
-
-    public void createFeedBegin(LockList locks, String dataverseName, String feedFullyQualifiedName)
-            throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireFeedWriteLock(locks, feedFullyQualifiedName);
-    }
-
-    public void connectFeedBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName,
-            String feedFullyQualifiedName) throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireDatasetReadLock(locks, datasetFullyQualifiedName);
-        acquireFeedReadLock(locks, feedFullyQualifiedName);
-    }
-
-    public void createFeedPolicyBegin(LockList locks, String dataverseName, String policyName) throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireFeedPolicyWriteLock(locks, policyName);
-    }
-
-    public void disconnectFeedBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName,
-            String feedFullyQualifiedName) throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireDatasetReadLock(locks, datasetFullyQualifiedName);
-        acquireFeedReadLock(locks, feedFullyQualifiedName);
-    }
-
-    public void compactBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName)
-            throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireDatasetReadLock(locks, datasetFullyQualifiedName);
+    @Override
+    public void upgradeDatasetLockToWrite(LockList locks, String fullyQualifiedName) throws AlgebricksException {
+        String key = DATASET_PREFIX + fullyQualifiedName;
+        DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION);
+        locks.upgrade(IMetadataLock.Mode.UPGRADED_WRITE, lock);
     }
 
-    public void refreshDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName)
-            throws AsterixException {
-        acquireDataverseReadLock(locks, dataverseName);
-        acquireExternalDatasetRefreshLock(locks, datasetFullyQualifiedName);
+    @Override
+    public void downgradeDatasetLockToExclusiveModify(LockList locks, String fullyQualifiedName)
+            throws AlgebricksException {
+        String key = DATASET_PREFIX + fullyQualifiedName;
+        DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION);
+        locks.downgrade(IMetadataLock.Mode.EXCLUSIVE_MODIFY, lock);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 154e1b5..e4a6ca8 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -34,14 +34,15 @@ import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.context.TransactionSubsystemProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -51,7 +52,6 @@ import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.asterix.metadata.lock.MetadataLockManager;
 import org.apache.asterix.om.base.AMutableString;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.types.ARecordType;
@@ -283,22 +283,22 @@ public class DatasetUtil {
             metaItemType = (ARecordType) metadataProvider.findMetaType(dataset);
         }
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
-                .getSplitProviderAndConstraints(dataset);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(dataset);
         FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < fs.length; i++) {
             sb.append(fs[i] + " ");
         }
         LOGGER.info("CREATING File Splits: " + sb.toString());
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset,
-                metadataProvider.getMetadataTxnContext());
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
         //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
         IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, metaItemType,
                 compactionInfo.first, compactionInfo.second);
-        IndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(
-                metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first,
-                resourceFactory, !dataset.isTemp());
+        IndexBuilderFactory indexBuilderFactory =
+                new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+                        splitsAndConstraint.first, resourceFactory, !dataset.isTemp());
         IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
                 splitsAndConstraint.second);
@@ -344,8 +344,8 @@ public class DatasetUtil {
      */
     public static IOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider metadataProvider,
             Dataset dataset, JobId jobId) throws AlgebricksException {
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
-                .getSplitProviderAndConstraints(dataset);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(dataset);
         IFileSplitProvider primaryFileSplitProvider = primarySplitsAndConstraint.first;
         AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
         // -Infinity
@@ -396,8 +396,8 @@ public class DatasetUtil {
         try {
             Index primaryIndex = metadataProvider.getIndex(dataset.getDataverseName(), dataset.getDatasetName(),
                     dataset.getDatasetName());
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
-                    .getSplitProviderAndConstraints(dataset);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                    metadataProvider.getSplitProviderAndConstraints(dataset);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -405,8 +405,8 @@ public class DatasetUtil {
             for (int i = 0; i < numKeys; i++) {
                 primaryKeyFields[i] = i;
             }
-            boolean hasSecondaries = metadataProvider
-                    .getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1;
+            boolean hasSecondaries =
+                    metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1;
             IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
             IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
                     storageComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields);
@@ -426,8 +426,8 @@ public class DatasetUtil {
             f++;
             // add the previous meta second
             if (dataset.hasMetaPart()) {
-                outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
-                        .getSerializerDeserializer(metaItemType);
+                outputSerDes[f] =
+                        FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
                 outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
                 f++;
             }
@@ -477,8 +477,8 @@ public class DatasetUtil {
      */
     public static IOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec, Dataset dataset,
             MetadataProvider metadataProvider) throws AlgebricksException {
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
-                .getSplitProviderAndConstraints(dataset);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(dataset);
         AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
 
         // Build dummy tuple containing one field with a dummy value inside.
@@ -506,9 +506,8 @@ public class DatasetUtil {
         return datasetName.indexOf('.') > 0; //NOSONAR a fully qualified name can't start with a .
     }
 
-    public static String getDataverseFromFullyQualifiedName(String datasetName) {
-        int idx = datasetName.indexOf('.');
-        return datasetName.substring(0, idx);
+    public static String getFullyQualifiedName(Dataset dataset) {
+        return dataset.getDataverseName() + '.' + dataset.getDatasetName();
     }
 
     /***
@@ -548,13 +547,14 @@ public class DatasetUtil {
      */
     public static String createNodeGroupForNewDataset(String dataverseName, String datasetName, long rebalanceCount,
             Set<String> ncNames, MetadataProvider metadataProvider) throws Exception {
+        ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
         String nodeGroup = dataverseName + "." + datasetName + (rebalanceCount == 0L ? "" : "_" + rebalanceCount);
         MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-        MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
+        appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
         NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroup);
         if (ng != null) {
             nodeGroup = nodeGroup + "_" + UUID.randomUUID().toString();
-            MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
+            appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
         }
         MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodeGroup, new ArrayList<>(ncNames)));
         return nodeGroup;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
new file mode 100644
index 0000000..4cf25f7
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.metadata.LockList;
+
+public class MetadataLockUtil {
+
+    private MetadataLockUtil() {
+    }
+
+    public static void createDatasetBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String itemTypeDataverseName, String itemTypeFullyQualifiedName, String metaItemTypeDataverseName,
+            String metaItemTypeFullyQualifiedName, String nodeGroupName, String compactionPolicyName,
+            String datasetFullyQualifiedName, boolean isDefaultCompactionPolicy) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        if (!dataverseName.equals(itemTypeDataverseName)) {
+            lockMgr.acquireDataverseReadLock(locks, itemTypeDataverseName);
+        }
+        if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName)
+                && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) {
+            lockMgr.acquireDataverseReadLock(locks, metaItemTypeDataverseName);
+        }
+        lockMgr.acquireDataTypeReadLock(locks, itemTypeFullyQualifiedName);
+        if (metaItemTypeFullyQualifiedName != null
+                && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) {
+            lockMgr.acquireDataTypeReadLock(locks, metaItemTypeFullyQualifiedName);
+        }
+        if (nodeGroupName != null) {
+            lockMgr.acquireNodeGroupReadLock(locks, nodeGroupName);
+        }
+        if (!isDefaultCompactionPolicy) {
+            lockMgr.acquireMergePolicyReadLock(locks, compactionPolicyName);
+        }
+        lockMgr.acquireDatasetWriteLock(locks, datasetFullyQualifiedName);
+    }
+
+    public static void createIndexBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String datasetFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireDatasetCreateIndexLock(locks, datasetFullyQualifiedName);
+    }
+
+    public static void dropIndexBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String datasetFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireDatasetWriteLock(locks, datasetFullyQualifiedName);
+    }
+
+    public static void createTypeBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String itemTypeFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireDataTypeWriteLock(locks, itemTypeFullyQualifiedName);
+    }
+
+    public static void dropDatasetBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String datasetFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireDatasetWriteLock(locks, datasetFullyQualifiedName);
+    }
+
+    public static void dropTypeBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String dataTypeFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireDataTypeWriteLock(locks, dataTypeFullyQualifiedName);
+    }
+
+    public static void functionStatementBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String functionFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireFunctionWriteLock(locks, functionFullyQualifiedName);
+    }
+
+    public static void modifyDatasetBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String datasetFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireDatasetModifyLock(locks, datasetFullyQualifiedName);
+    }
+
+    public static void insertDeleteUpsertBegin(IMetadataLockManager lockMgr, LockList locks,
+            String datasetFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks,
+                MetadataUtil.getDataverseFromFullyQualifiedName(datasetFullyQualifiedName));
+        lockMgr.acquireDatasetModifyLock(locks, datasetFullyQualifiedName);
+    }
+
+    public static void dropFeedBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String feedFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireActiveEntityWriteLock(locks, feedFullyQualifiedName);
+    }
+
+    public static void dropFeedPolicyBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String policyName) throws AsterixException {
+        lockMgr.acquireActiveEntityWriteLock(locks, policyName);
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+    }
+
+    public static void startFeedBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String feedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireActiveEntityReadLock(locks, feedName);
+    }
+
+    public static void stopFeedBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String feedName) throws AsterixException {
+        // TODO: dataset lock?
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireActiveEntityReadLock(locks, feedName);
+    }
+
+    public static void createFeedBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String feedFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireActiveEntityWriteLock(locks, feedFullyQualifiedName);
+    }
+
+    public static void connectFeedBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String datasetFullyQualifiedName, String feedFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireActiveEntityReadLock(locks, feedFullyQualifiedName);
+        lockMgr.acquireDatasetReadLock(locks, datasetFullyQualifiedName);
+    }
+
+    public static void createFeedPolicyBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String policyName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireFeedPolicyWriteLock(locks, policyName);
+    }
+
+    public static void disconnectFeedBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String datasetFullyQualifiedName, String feedFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireActiveEntityReadLock(locks, feedFullyQualifiedName);
+        lockMgr.acquireDatasetReadLock(locks, datasetFullyQualifiedName);
+    }
+
+    public static void compactBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String datasetFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireDatasetReadLock(locks, datasetFullyQualifiedName);
+    }
+
+    public static void refreshDatasetBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String datasetFullyQualifiedName) throws AsterixException {
+        lockMgr.acquireDataverseReadLock(locks, dataverseName);
+        lockMgr.acquireDatasetExclusiveModificationLock(locks, datasetFullyQualifiedName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java
index 3133aba..e5d4721 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java
@@ -38,4 +38,9 @@ public class MetadataUtil {
                 return "Unknown Pending Operation";
         }
     }
+
+    public static String getDataverseFromFullyQualifiedName(String datasetName) {
+        int idx = datasetName.indexOf('.');
+        return datasetName.substring(0, idx);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index e634d4e..6825f10 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -24,11 +24,12 @@ import java.util.List;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -59,12 +60,12 @@ public class SplitsAndConstraintsUtil {
     public static FileSplit[] getIndexSplits(Dataset dataset, String indexName, MetadataTransactionContext mdTxnCtx)
             throws AlgebricksException {
         try {
-            List<String> nodeGroup =
-                    MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames();
+            NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName());
             if (nodeGroup == null) {
                 throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
             }
-            return getIndexSplits(dataset, indexName, nodeGroup);
+            List<String> nodeList = nodeGroup.getNodeNames();
+            return getIndexSplits(dataset, indexName, nodeList);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
         }
@@ -85,11 +86,10 @@ public class SplitsAndConstraintsUtil {
 
             for (int k = 0; k < numPartitions; k++) {
                 // format: 'storage dir name'/partition_#/dataverse/dataset_idx_index
-                File f = new File(
-                        StoragePathUtil.prepareStoragePartitionPath(storageDirName, nodePartitions[k].getPartitionId())
-                                + (dataset.isTemp() ? (File.separator + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER)
-                                        : "")
-                                + File.separator + relPathFile);
+                File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
+                        nodePartitions[k].getPartitionId())
+                        + (dataset.isTemp() ? (File.separator + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER) : "")
+                        + File.separator + relPathFile);
                 splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[k], f.getPath()));
             }
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java
index 5bedffa..ff65994 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java
@@ -21,8 +21,8 @@ package org.apache.asterix.metadata.valueextractors;
 
 import java.rmi.RemoteException;
 
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.api.IMetadataEntityTupleTranslator;
 import org.apache.asterix.metadata.api.IValueExtractor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java
index 32bed0d..5f0525b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java
@@ -24,8 +24,8 @@ import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
 
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.api.IValueExtractor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java
index fcf69d5..1928d7e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java
@@ -21,8 +21,8 @@ package org.apache.asterix.metadata.valueextractors;
 
 import java.nio.ByteBuffer;
 
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.api.IValueExtractor;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.exceptions.HyracksDataException;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
index 6f3bd30..e5d0d7d 100644
--- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
+++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails.FileStructure;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
index fe76c6b..cdbaad3 100644
--- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
+++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
@@ -30,7 +30,7 @@ import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.metadata.MetadataNode;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Datatype;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 4cd243c..28c480f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -21,6 +21,8 @@ package org.apache.asterix.runtime.utils;
 import java.io.IOException;
 import java.util.function.Supplier;
 
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.BuildProperties;
@@ -74,12 +76,13 @@ public class CcApplicationContext implements ICcApplicationContext {
     private Object extensionManager;
     private IFaultToleranceStrategy ftStrategy;
     private IJobLifecycleListener activeLifeCycleListener;
+    private IMetadataLockManager mdLockManager;
 
     public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
             ILibraryManager libraryManager, IResourceIdManager resourceIdManager,
             Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
             IFaultToleranceStrategy ftStrategy, IJobLifecycleListener activeLifeCycleListener,
-            IStorageComponentProvider storageComponentProvider)
+            IStorageComponentProvider storageComponentProvider, IMetadataLockManager mdLockManager)
             throws AsterixException, IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
@@ -105,6 +108,7 @@ public class CcApplicationContext implements ICcApplicationContext {
         this.metadataBootstrapSupplier = metadataBootstrapSupplier;
         this.globalRecoveryManager = globalRecoveryManager;
         this.storageComponentProvider = storageComponentProvider;
+        this.mdLockManager = mdLockManager;
     }
 
     @Override
@@ -210,7 +214,7 @@ public class CcApplicationContext implements ICcApplicationContext {
     }
 
     @Override
-    public IJobLifecycleListener getActiveLifecycleListener() {
+    public IJobLifecycleListener getActiveNotificationHandler() {
         return activeLifeCycleListener;
     }
 
@@ -218,4 +222,14 @@ public class CcApplicationContext implements ICcApplicationContext {
     public IStorageComponentProvider getStorageComponentProvider() {
         return storageComponentProvider;
     }
+
+    @Override
+    public IMetadataLockManager getMetadataLockManager() {
+        return mdLockManager;
+    }
+
+    @Override
+    public IClusterStateManager getClusterStateManager() {
+        return ClusterStateManager.INSTANCE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index da341af..4717a7b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -66,17 +66,11 @@ public class ClusterStateManager implements IClusterStateManager {
     public static final ClusterStateManager INSTANCE = new ClusterStateManager();
     private final Map<String, Map<IOption, Object>> activeNcConfiguration = new HashMap<>();
     private Set<String> pendingRemoval = new HashSet<>();
-
     private final Cluster cluster;
     private ClusterState state = ClusterState.UNUSABLE;
-
     private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint;
-
-    private boolean globalRecoveryCompleted = false;
-
     private Map<String, ClusterPartition[]> node2PartitionsMap;
     private SortedMap<Integer, ClusterPartition> clusterPartitions;
-
     private String currentMetadataNode = null;
     private boolean metadataNodeActive = false;
     private Set<String> failedNodes = new HashSet<>();
@@ -117,7 +111,9 @@ public class ClusterStateManager implements IClusterStateManager {
 
     @Override
     public synchronized void setState(ClusterState state) {
+        LOGGER.info("updating cluster state from " + this.state + " to " + state.name());
         this.state = state;
+        appCtx.getGlobalRecoveryManager().notifyStateChange(state);
         LOGGER.info("Cluster State is now " + state.name());
         // Notify any waiting threads for the cluster state to change.
         notifyAll();
@@ -262,16 +258,8 @@ public class ClusterStateManager implements IClusterStateManager {
                 clusterActiveLocations.add(p.getActiveNodeId());
             }
         }
-        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
-                clusterActiveLocations.toArray(new String[] {}));
-    }
-
-    public boolean isGlobalRecoveryCompleted() {
-        return globalRecoveryCompleted;
-    }
-
-    public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) {
-        this.globalRecoveryCompleted = globalRecoveryCompleted;
+        clusterPartitionConstraint =
+                new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {}));
     }
 
     public boolean isClusterActive() {
@@ -384,13 +372,12 @@ public class ClusterStateManager implements IClusterStateManager {
 
     @Override
     public synchronized void deregisterNodePartitions(String nodeId) {
-        ClusterPartition [] nodePartitions = node2PartitionsMap.remove(nodeId);
+        ClusterPartition[] nodePartitions = node2PartitionsMap.remove(nodeId);
         if (nodePartitions == null) {
             LOGGER.info("deregisterNodePartitions unknown node " + nodeId + " (already removed?)");
         } else {
             if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("deregisterNodePartitions for node " + nodeId + ": " +
-                        Arrays.toString(nodePartitions));
+                LOGGER.info("deregisterNodePartitions for node " + nodeId + ": " + Arrays.toString(nodePartitions));
             }
             for (ClusterPartition nodePartition : nodePartitions) {
                 clusterPartitions.remove(nodePartition.getPartitionId());
@@ -413,7 +400,7 @@ public class ClusterStateManager implements IClusterStateManager {
             LOGGER.info("Deregistering intention to remove node id " + nodeId);
         }
         if (!pendingRemoval.remove(nodeId)) {
-            LOGGER.warning("Cannot deregister intention to remove node id " + nodeId  + " that was not registered");
+            LOGGER.warning("Cannot deregister intention to remove node id " + nodeId + " that was not registered");
             return false;
         } else {
             return true;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java b/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java
index e4a94a9..3b2d0ce 100644
--- a/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java
+++ b/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java
@@ -47,13 +47,13 @@ import org.apache.asterix.common.annotations.TypeDataGen;
 import org.apache.asterix.common.annotations.UndeclaredFieldsDataGen;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.lang.aql.parser.AQLParserFactory;
 import org.apache.asterix.lang.aql.parser.ParseException;
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/translator/ADGenDmlTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/translator/ADGenDmlTranslator.java b/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/translator/ADGenDmlTranslator.java
index a525b1b..94389b2 100644
--- a/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/translator/ADGenDmlTranslator.java
+++ b/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/translator/ADGenDmlTranslator.java
@@ -24,10 +24,10 @@ import java.util.Map;
 
 import org.apache.asterix.common.annotations.TypeDataGen;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.TypeDecl;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeSignature;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
index 443edf1..f5695a5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
@@ -46,7 +46,7 @@ public class UpsertOperationCallback extends AbstractIndexModificationOperationC
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
             log(pkHash, after, before);
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index 3119ddd..008f0be 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -24,21 +24,23 @@ import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 
 public interface IDatasetPartitionManager extends IDatasetManager {
-    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
+    IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
             boolean asyncMode, int partition, int nPartitions) throws HyracksException;
 
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
+    void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
             boolean orderedResult, boolean emptyResult) throws HyracksException;
 
-    public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition)
-            throws HyracksException;
+    void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
 
-    public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc)
+    void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc)
             throws HyracksException;
 
-    public void removePartition(JobId jobId, ResultSetId resultSetId, int partition);
+    void removePartition(JobId jobId, ResultSetId resultSetId, int partition);
+
+    void abortReader(JobId jobId);
+
+    void abortAllReaders();
 
-    public void abortReader(JobId jobId);
+    void close();
 
-    public void close();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index c9cdb2d..0a99ea6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -53,11 +53,6 @@ public class HyracksDataException extends HyracksException {
         return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), cause, params);
     }
 
-    public static HyracksDataException create(HyracksDataException e, String nodeId) {
-        return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId,
-                e.getParams());
-    }
-
     public static HyracksDataException suppress(HyracksDataException root, Throwable th) {
         if (root == null) {
             return HyracksDataException.create(th);
@@ -124,7 +119,7 @@ public class HyracksDataException extends HyracksException {
     }
 
     public HyracksDataException(String component, int errorCode, Throwable cause, Serializable... params) {
-        super(component, errorCode, cause.getMessage(), cause, null, params);
+        super(component, errorCode, cause, params);
     }
 
     public HyracksDataException(String component, int errorCode, String message, Throwable cause,
@@ -132,4 +127,8 @@ public class HyracksDataException extends HyracksException {
         super(component, errorCode, message, cause, null, params);
     }
 
+    public static HyracksDataException create(HyracksDataException e, String nodeId) {
+        return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId,
+                e.getParams());
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
index 30ffebe..338c331 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
@@ -18,12 +18,38 @@
  */
 package org.apache.hyracks.api.job;
 
+import java.util.List;
+
 import org.apache.hyracks.api.exceptions.HyracksException;
 
+/**
+ * A listener for job related events
+ */
 public interface IJobLifecycleListener {
-    public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException;
+    /**
+     * Notify the listener that a job has been created
+     *
+     * @param jobId
+     * @param spec
+     * @throws HyracksException
+     */
+    void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException;
 
-    public void notifyJobStart(JobId jobId) throws HyracksException;
+    /**
+     * Notify the listener that the job has started on the cluster controller
+     *
+     * @param jobId
+     * @throws HyracksException
+     */
+    void notifyJobStart(JobId jobId) throws HyracksException;
 
-    public void notifyJobFinish(JobId jobId) throws HyracksException;
+    /**
+     * Notify the listener that the job has been terminated, passing exceptions in case of failure
+     *
+     * @param jobId
+     * @param jobStatus
+     * @param exceptions
+     * @throws HyracksException
+     */
+    void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 627f406..e1c218f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -167,7 +167,7 @@ public class ClusterControllerService implements IControllerService {
         threadDumpRunMap = Collections.synchronizedMap(new HashMap<>());
 
         // Node manager is in charge of cluster membership management.
-        nodeManager = new NodeManager(ccConfig, resourceManager);
+        nodeManager = new NodeManager(this, ccConfig, resourceManager);
 
         jobIdFactory = new JobIdFactory();
     }
@@ -193,9 +193,9 @@ public class ClusterControllerService implements IControllerService {
         clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI,
                 new CCNCFunctions.SerializerDeserializer());
         IIPCI ciIPCI = new ClientInterfaceIPCI(this, jobIdFactory);
-        clientIPC = new IPCSystem(
-                new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()),
-                ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer());
+        clientIPC =
+                new IPCSystem(new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()),
+                        ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer());
         webServer = new WebServer(this, ccConfig.getConsoleListenPort());
         clusterIPC.start();
         clientIPC.start();
@@ -222,9 +222,9 @@ public class ClusterControllerService implements IControllerService {
 
         // Job manager is in charge of job lifecycle management.
         try {
-            Constructor<?> jobManagerConstructor = this.getClass().getClassLoader()
-                    .loadClass(ccConfig.getJobManagerClass())
-                    .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
+            Constructor<?> jobManagerConstructor =
+                    this.getClass().getClassLoader().loadClass(ccConfig.getJobManagerClass()).getConstructor(
+                            CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
             jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
         } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
                 | InvocationTargetException e) {
@@ -263,13 +263,14 @@ public class ClusterControllerService implements IControllerService {
             @Override
             public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException {
                 // no-op, we don't care
+                LOGGER.log(Level.WARNING, "Getting notified that node: " + nodeId + " has joined. and we don't care");
             }
 
             @Override
             public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException {
+                LOGGER.log(Level.WARNING, "Getting notified that nodes: " + deadNodeIds + " has failed");
                 for (String nodeId : deadNodeIds) {
                     Pair<String, Integer> ncService = getNCService(nodeId);
-
                     final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this,
                             ncService.getLeft(), ncService.getRight(), nodeId);
                     workQueue.schedule(triggerWork);
@@ -396,8 +397,8 @@ public class ClusterControllerService implements IControllerService {
 
         @Override
         public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException {
-            GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork(
-                    ClusterControllerService.this.getNodeManager(), map);
+            GetIpAddressNodeNameMapWork ginmw =
+                    new GetIpAddressNodeNameMapWork(ClusterControllerService.this.getNodeManager(), map);
             try {
                 workQueue.scheduleAndSync(ginmw);
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index 5075081..26245e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -36,6 +36,7 @@ import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.application.ServiceContext;
@@ -88,14 +89,14 @@ public class CCServiceContext extends ServiceContext implements ICCServiceContex
         }
     }
 
-    public synchronized void notifyJobFinish(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
+            throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobFinish(jobId);
+            l.notifyJobFinish(jobId, jobStatus, exceptions);
         }
     }
 
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec)
-            throws HyracksException {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
             l.notifyJobCreation(jobId, spec);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 47e78a3..8cca1e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -38,19 +38,24 @@ import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.scheduler.IResourceManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 
 public class NodeManager implements INodeManager {
     private static final Logger LOGGER = Logger.getLogger(NodeManager.class.getName());
 
+    private final ClusterControllerService ccs;
     private final CCConfig ccConfig;
     private final IResourceManager resourceManager;
     private final Map<String, NodeControllerState> nodeRegistry;
     private final Map<InetAddress, Set<String>> ipAddressNodeNameMap;
 
-    public NodeManager(CCConfig ccConfig, IResourceManager resourceManager) {
+    public NodeManager(ClusterControllerService ccs, CCConfig ccConfig, IResourceManager resourceManager) {
+        this.ccs = ccs;
         this.ccConfig = ccConfig;
         this.resourceManager = resourceManager;
         this.nodeRegistry = new LinkedHashMap<>();
@@ -79,15 +84,18 @@ public class NodeManager implements INodeManager {
 
     @Override
     public void addNode(String nodeId, NodeControllerState ncState) throws HyracksException {
+        LOGGER.warning("addNode(" + nodeId + ") called");
         if (nodeId == null || ncState == null) {
             throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
         }
         // Updates the node registry.
         if (nodeRegistry.containsKey(nodeId)) {
-            LOGGER.warning("Node with name " + nodeId + " has already registered; re-registering");
+            LOGGER.warning(
+                    "Node with name " + nodeId + " has already registered; failing the node then re-registering.");
+            removeDeadNode(nodeId);
         }
+        LOGGER.warning("adding node to registry");
         nodeRegistry.put(nodeId, ncState);
-
         // Updates the IP address to node names map.
         try {
             InetAddress ipAddress = getIpAddress(ncState);
@@ -98,8 +106,8 @@ public class NodeManager implements INodeManager {
             nodeRegistry.remove(nodeId);
             throw e;
         }
-
         // Updates the cluster capacity.
+        LOGGER.warning("updating cluster capacity");
         resourceManager.update(nodeId, ncState.getCapacity());
     }
 
@@ -147,6 +155,27 @@ public class NodeManager implements INodeManager {
         return Pair.of(deadNodes, affectedJobIds);
     }
 
+    public void removeDeadNode(String nodeId) throws HyracksException {
+        NodeControllerState state = nodeRegistry.get(nodeId);
+        Set<JobId> affectedJobIds = state.getActiveJobIds();
+        // Removes the node from node map.
+        nodeRegistry.remove(nodeId);
+        // Removes the node from IP map.
+        removeNodeFromIpAddressMap(nodeId, state);
+        // Updates the cluster capacity.
+        resourceManager.update(nodeId, new NodeCapacity(0L, 0));
+        LOGGER.info(nodeId + " considered dead");
+        IJobManager jobManager = ccs.getJobManager();
+        Set<String> collection = Collections.singleton(nodeId);
+        for (JobId jobId : affectedJobIds) {
+            JobRun run = jobManager.get(jobId);
+            if (run != null) {
+                run.getExecutor().notifyNodeFailures(collection);
+            }
+        }
+        ccs.getContext().notifyNodeFailure(collection);
+    }
+
     @Override
     public void apply(NodeFunction nodeFunction) {
         nodeRegistry.forEach(nodeFunction::apply);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 8401fcf..2685f60 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -41,6 +41,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.cc.PreDistributedJobStore;
 import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
 import org.apache.hyracks.control.common.work.IResultCallback;
@@ -69,7 +70,7 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
         this.preDistributedJobStore = preDistributedJobStore;
-        jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
+        jobResultLocations = new LinkedHashMap<>();
     }
 
     @Override
@@ -94,7 +95,7 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
     }
 
     @Override
-    public void notifyJobFinish(JobId jobId) throws HyracksException {
+    public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
         // Auto-generated method stub
     }
 
@@ -179,7 +180,7 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
 
     @Override
     public synchronized long getResultTimestamp(JobId jobId) {
-        if (preDistributedJobStore.jobIsPredistributed(jobId)){
+        if (preDistributedJobStore.jobIsPredistributed(jobId)) {
             return -1;
         }
         return getState(jobId).getTimestamp();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 45c7711..c1a7899 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -132,8 +132,8 @@ public class JobManager implements IJobManager {
         // Removes a pending job.
         JobRun jobRun = jobQueue.remove(jobId);
         if (jobRun != null) {
-            List<Exception> exceptions = Collections
-                    .singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
+            List<Exception> exceptions =
+                    Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
             // Since the job has not been executed, we only need to update its status and lifecyle here.
             jobRun.setStatus(JobStatus.FAILURE, exceptions);
             runMapArchive.put(jobId, jobRun);
@@ -179,7 +179,7 @@ public class JobManager implements IJobManager {
                 } catch (Exception e) {
                     LOGGER.log(Level.SEVERE, e.getMessage(), e);
                     if (caughtException == null) {
-                        caughtException = new HyracksException(e);
+                        caughtException = HyracksException.create(e);
                     } else {
                         caughtException.addSuppressed(e);
                     }
@@ -208,7 +208,7 @@ public class JobManager implements IJobManager {
         CCServiceContext serviceCtx = ccs.getContext();
         if (serviceCtx != null) {
             try {
-                serviceCtx.notifyJobFinish(jobId);
+                serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions());
             } catch (HyracksException e) {
                 LOGGER.log(Level.SEVERE, e.getMessage(), e);
                 caughtException = e;
@@ -249,8 +249,6 @@ public class JobManager implements IJobManager {
         }
     }
 
-
-
     @Override
     public Collection<JobRun> getRunningJobs() {
         return activeRunMap.values();
@@ -320,9 +318,8 @@ public class JobManager implements IJobManager {
         try {
             run.getExecutor().startJob();
         } catch (Exception e) {
-            ccs.getWorkQueue()
-                    .schedule(new JobCleanupWork(ccs.getJobManager(), run.getJobId(), JobStatus.FAILURE,
-                            Collections.singletonList(e)));
+            ccs.getWorkQueue().schedule(new JobCleanupWork(ccs.getJobManager(), run.getJobId(), JobStatus.FAILURE,
+                    Collections.singletonList(e)));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index f44fca0..bf0846f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -54,6 +54,7 @@ public class RegisterNodeWork extends SynchronizableWork {
         CCNCFunctions.NodeRegistrationResult result;
         Map<IOption, Object> ncConfiguration = new HashMap<>();
         try {
+            LOGGER.log(Level.WARNING, "Registering INodeController: id = " + id);
             INodeController nc = new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
             NodeControllerState state = new NodeControllerState(nc, reg);
             INodeManager nodeManager = ccs.getNodeManager();
@@ -71,9 +72,12 @@ public class RegisterNodeWork extends SynchronizableWork {
             result = new CCNCFunctions.NodeRegistrationResult(params, null);
             ccs.getJobIdFactory().ensureMinimumId(reg.getMaxJobId() + 1);
         } catch (Exception e) {
+            LOGGER.log(Level.WARNING, "Node registration failed", e);
             result = new CCNCFunctions.NodeRegistrationResult(null, e);
         }
+        LOGGER.warning("sending registration response to node");
         ncIPCHandle.send(-1, result, null);
+        LOGGER.warning("notifying node join");
         ccs.getContext().notifyNodeJoin(id, ncConfiguration);
     }
 }


Mime
View raw message