asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject incubator-asterixdb git commit: Asterix MessageBroker implementation
Date Wed, 23 Dec 2015 02:30:42 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 8b41d0bc2 -> fc7272c0c


Asterix MessageBroker implementation

This change includes the following:
- Add implementation for CC/NC MessageBroker.
- Implement GlobalResourceIdFactory using MessageBroker.
- Change resource id factory to GlobalResourceIdFactory.
- Refactor metadata indexes fixed properties.
- Use fixed resource ids for metadata indexes.

Change-Id: If4320e2c5a0130d2f86a4be6ae61f5cee43e30af
Reviewed-on: https://asterix-gerrit.ics.uci.edu/486
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/fc7272c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/fc7272c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/fc7272c0

Branch: refs/heads/master
Commit: fc7272c0cff6d2f5c53278eba4aea889b5ed1c06
Parents: 8b41d0b
Author: Murtadha Hubail <mhubail@uci.edu>
Authored: Tue Dec 22 17:14:53 2015 -0800
Committer: Murtadha Hubail <hubailmor@gmail.com>
Committed: Tue Dec 22 18:26:20 2015 -0800

----------------------------------------------------------------------
 .../api/common/AsterixAppRuntimeContext.java    |  24 ++--
 ...rixAppRuntimeContextProdiverForRecovery.java |   4 +-
 .../bootstrap/CCApplicationEntryPoint.java      |   6 +
 .../bootstrap/NCApplicationEntryPoint.java      |  15 ++-
 .../asterix/messaging/CCMessageBroker.java      | 109 ++++++++++++++++
 .../asterix/messaging/NCMessageBroker.java      |  94 ++++++++++++++
 asterix-common/pom.xml                          |  10 +-
 .../common/api/IAsterixAppRuntimeContext.java   |   3 +-
 .../messaging/AbstractApplicationMessage.java   |  36 ++++++
 .../messaging/ReportMaxResourceIdMessage.java   |  37 ++++++
 .../ReportMaxResourceIdRequestMessage.java      |  37 ++++++
 .../messaging/ResourceIdRequestMessage.java     |  28 ++++
 .../ResourceIdRequestResponseMessage.java       |  47 +++++++
 .../messaging/api/IApplicationMessage.java      |  47 +++++++
 .../api/IApplicationMessageCallback.java        |  30 +++++
 .../common/messaging/api/INCMessageBroker.java  |  40 ++++++
 .../IAsterixAppRuntimeContextProvider.java      |   4 +-
 .../apache/asterix/metadata/MetadataNode.java   |   5 +-
 .../metadata/bootstrap/MetadataBootstrap.java   |   9 +-
 .../metadata/bootstrap/MetadataIndex.java       |  17 +--
 .../MetadataIndexImmutableProperties.java       |  79 ++++++++++++
 .../bootstrap/MetadataPrimaryIndexes.java       | 127 +++++++++----------
 .../bootstrap/MetadataSecondaryIndexes.java     |  27 ++--
 .../om/util/AsterixClusterProperties.java       |  12 ++
 .../resource/GlobalResourceIdFactory.java       |  76 +++++++++++
 .../GlobalResourceIdFactoryProvider.java        |  34 +++++
 .../AsterixRuntimeComponentsProvider.java       |   4 +-
 27 files changed, 838 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index ce19139..45c0598 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -49,12 +49,13 @@ import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.feeds.FeedManager;
-import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
+import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.replication.management.ReplicationChannel;
 import org.apache.asterix.replication.management.ReplicationManager;
 import org.apache.asterix.replication.recovery.RemoteRecoveryManager;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
+import org.apache.asterix.transaction.management.resource.GlobalResourceIdFactoryProvider;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -81,8 +82,7 @@ import org.apache.hyracks.storage.common.file.IFileMapManager;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
-import org.apache.hyracks.storage.common.file.ResourceIdFactoryProvider;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider {
 
@@ -118,7 +118,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
 
     private ILSMIOOperationScheduler lsmIOScheduler;
     private ILocalResourceRepository localResourceRepository;
-    private ResourceIdFactory resourceIdFactory;
+    private IResourceIdFactory resourceIdFactory;
     private IIOManager ioManager;
     private boolean isShuttingdown;
 
@@ -158,13 +158,14 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
 
         metadataMergePolicyFactory = new PrefixMergePolicyFactory();
 
+        ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
+                ioManager, ncApplicationContext.getNodeId());
+        localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
+
         IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
                 this);
         txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
                 txnProperties);
-        ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
-                ioManager, ncApplicationContext.getNodeId());
-        localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
 
         IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager();
         SystemState systemState = recoveryMgr.getSystemState();
@@ -175,7 +176,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         initializeResourceIdFactory();
 
         datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
-                MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager());
+                MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager());
 
         isShuttingdown = false;
 
@@ -278,7 +279,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         return localResourceRepository;
     }
 
-    public ResourceIdFactory getResourceIdFactory() {
+    public IResourceIdFactory getResourceIdFactory() {
         return resourceIdFactory;
     }
 
@@ -376,6 +377,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
 
     @Override
     public void initializeResourceIdFactory() throws HyracksDataException {
-        resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
+        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext)
+                .createResourceIdFactory();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
index 570c3c9..b975970 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
@@ -32,7 +32,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public class AsterixAppRuntimeContextProdiverForRecovery implements IAsterixAppRuntimeContextProvider {
 
@@ -78,7 +78,7 @@ public class AsterixAppRuntimeContextProdiverForRecovery implements IAsterixAppR
     }
 
     @Override
-    public ResourceIdFactory getResourceIdFactory() {
+    public IResourceIdFactory getResourceIdFactory() {
         return asterixAppRuntimeContext.getResourceIdFactory();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 80ce5ea..d2164f4 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -43,6 +43,7 @@ import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.event.service.ILookupService;
 import org.apache.asterix.feeds.CentralFeedManager;
 import org.apache.asterix.feeds.FeedLifecycleListener;
+import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
@@ -55,6 +56,8 @@ import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.messages.IMessageBroker;
+import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -72,9 +75,11 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
     private static IAsterixStateProxy proxy;
     private ICCApplicationContext appCtx;
+    private IMessageBroker messageBroker;
 
     @Override
     public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
+        messageBroker = new CCMessageBroker((ClusterControllerService)ccAppCtx.getControllerService());
         this.appCtx = ccAppCtx;
 
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -118,6 +123,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         }
 
         ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
+        ccAppCtx.setMessageBroker(messageBroker);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 2cd6a1a..147a356 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -34,12 +34,13 @@ import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.event.schema.cluster.SubstituteNodes;
+import org.apache.asterix.messaging.NCMessageBroker;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataNode;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -54,6 +55,8 @@ import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.application.INCApplicationEntryPoint;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.messages.IMessageBroker;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.kohsuke.args4j.CmdLineException;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
@@ -75,6 +78,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
     private SystemState systemState = SystemState.NEW_UNIVERSE;
     private boolean performedRemoteRecovery = false;
     private boolean replicationEnabled = false;
+    private IMessageBroker messageBroker;
 
     @Override
     public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
@@ -91,6 +95,8 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
 
         ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getLifeCycleComponentManager()));
         ncApplicationContext = ncAppCtx;
+        messageBroker = new NCMessageBroker((NodeControllerService) ncAppCtx.getControllerService());
+        ncApplicationContext.setMessageBroker(messageBroker);
         nodeId = ncApplicationContext.getNodeId();
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting Asterix node controller: " + nodeId);
@@ -191,6 +197,9 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
 
     @Override
     public void notifyStartupComplete() throws Exception {
+        //send max resource id on this NC to the CC
+        ((INCMessageBroker) ncApplicationContext.getMessageBroker()).reportMaxResourceId();
+
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
                 .getMetadataProperties();
 
@@ -250,8 +259,8 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Configured:" + lccm);
         }
-        ncApplicationContext.setStateDumpHandler(new AsterixStateDumpHandler(ncApplicationContext.getNodeId(), lccm
-                .getDumpPath(), lccm));
+        ncApplicationContext.setStateDumpHandler(
+                new AsterixStateDumpHandler(ncApplicationContext.getNodeId(), lccm.getDumpPath(), lccm));
 
         lccm.startAll();
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
new file mode 100644
index 0000000..095ef1b
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
+import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage;
+import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
+import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.api.messages.IMessage;
+import org.apache.hyracks.api.messages.IMessageBroker;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+
+public class CCMessageBroker implements IMessageBroker {
+
+    private final static Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
+    private final AtomicLong globalResourceId = new AtomicLong(0);
+    private final ClusterControllerService ccs;
+    private final Set<String> nodesReportedMaxResourceId = new HashSet<>();
+    public static final long NO_CALLBACK_MESSAGE_ID = -1;
+
+    public CCMessageBroker(ClusterControllerService ccs) {
+        this.ccs = ccs;
+    }
+
+    @Override
+    public void receivedMessage(IMessage message, String nodeId) throws Exception {
+        AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+        switch (absMessage.getMessageType()) {
+            case RESOURCE_ID_REQUEST:
+                handleResourceIdRequest(message, nodeId);
+                break;
+            case REPORT_MAX_RESOURCE_ID_RESPONSE:
+                handleReportResourceMaxIdResponse(message, nodeId);
+                break;
+            default:
+                LOGGER.warning("Unknown message: " + absMessage.getMessageType());
+                break;
+        }
+    }
+
+    private synchronized void handleResourceIdRequest(IMessage message, String nodeId) throws Exception {
+        ResourceIdRequestMessage msg = (ResourceIdRequestMessage) message;
+        ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
+        reponse.setId(msg.getId());
+        //cluster is not active
+        if (!AsterixClusterProperties.isClusterActive()) {
+            reponse.setResourceId(-1);
+            reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
+        } else if (nodesReportedMaxResourceId.size() < AsterixClusterProperties.getNumberOfNodes()) {
+            //some node has not reported max resource id
+            reponse.setResourceId(-1);
+            reponse.setException(new Exception("One or more nodes has not reported max resource id."));
+            requestMaxResourceID();
+        } else {
+            reponse.setResourceId(globalResourceId.incrementAndGet());
+        }
+        sendApplicationMessageToNC(reponse, nodeId);
+    }
+
+    private synchronized void handleReportResourceMaxIdResponse(IMessage message, String nodeId) throws Exception {
+        ReportMaxResourceIdMessage msg = (ReportMaxResourceIdMessage) message;
+        globalResourceId.set(Math.max(msg.getMaxResourceId(), globalResourceId.get()));
+        nodesReportedMaxResourceId.add(nodeId);
+    }
+
+    private void sendApplicationMessageToNC(IMessage msg, String nodeId) throws Exception {
+        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+        NodeControllerState state = nodeMap.get(nodeId);
+        state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
+    }
+
+    private void requestMaxResourceID() throws Exception {
+        //send request to NCs that have not reported their max resource ids
+        Set<String> getParticipantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
+        ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
+        msg.setId(NO_CALLBACK_MESSAGE_ID);
+        for (String nodeId : getParticipantNodes) {
+            if (!nodesReportedMaxResourceId.contains(nodeId)) {
+                sendApplicationMessageToNC(msg, nodeId);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
new file mode 100644
index 0000000..001771e
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
+import org.apache.hyracks.api.messages.IMessage;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class NCMessageBroker implements INCMessageBroker {
+    private final NodeControllerService ncs;
+    private final AtomicLong messageId = new AtomicLong(0);
+    private final Map<Long, IApplicationMessageCallback> callbacks;
+
+    public NCMessageBroker(NodeControllerService ncs) {
+        this.ncs = ncs;
+        callbacks = new ConcurrentHashMap<Long, IApplicationMessageCallback>();
+    }
+
+    @Override
+    public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception {
+        if (callback != null) {
+            long uniqueMessageId = messageId.incrementAndGet();
+            message.setId(uniqueMessageId);
+            callbacks.put(uniqueMessageId, callback);
+        }
+        try {
+            ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
+        } catch (Exception e) {
+            if (callback != null) {
+                //remove the callback in case of failure
+                callbacks.remove(message.getId());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void receivedMessage(IMessage message, String nodeId) throws Exception {
+        AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+        //if the received message is a response to a sent message, deliver it to the sender
+        IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
+        if (callback != null) {
+            callback.deliverMessageResponse(absMessage);
+        }
+
+        //handle requests from CC
+        switch (absMessage.getMessageType()) {
+            case REPORT_MAX_RESOURCE_ID_REQUEST:
+                reportMaxResourceId();
+                break;
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void reportMaxResourceId() throws Exception {
+        IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
+                .getApplicationObject();
+        ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage();
+        //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes.
+        long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
+                MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
+        maxResourceIdMsg.setMaxResourceId(maxResourceId);
+        sendMessage(maxResourceIdMsg, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-common/pom.xml b/asterix-common/pom.xml
index 512502b..02f651f 100644
--- a/asterix-common/pom.xml
+++ b/asterix-common/pom.xml
@@ -215,7 +215,15 @@
 			<groupId>org.apache.hyracks</groupId>
 			<artifactId>hyracks-storage-am-lsm-rtree</artifactId>
 		</dependency>
-		<dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-control-cc</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-control-nc</artifactId>
+        </dependency>
+        <dependency>
 			<groupId>com.fasterxml.jackson.core</groupId>
 			<artifactId>jackson-core</artifactId>
 			<version>2.2.0</version>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 94f5b2f..cd829e7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 import org.apache.hyracks.storage.common.file.ResourceIdFactory;
 
 public interface IAsterixAppRuntimeContext {
@@ -65,7 +66,7 @@ public interface IAsterixAppRuntimeContext {
 
     public IDatasetLifecycleManager getDatasetLifecycleManager();
 
-    public ResourceIdFactory getResourceIdFactory();
+    public IResourceIdFactory getResourceIdFactory();
 
     public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
new file mode 100644
index 0000000..fbb9b86
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+
+public abstract class AbstractApplicationMessage implements IApplicationMessage {
+    private static final long serialVersionUID = 1L;
+    private long id;
+
+    @Override
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    @Override
+    public long getId() {
+        return id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
new file mode 100644
index 0000000..a2b94a7
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
+    private static final long serialVersionUID = 1L;
+    public long maxResourceId;
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_RESPONSE;
+    }
+
+    public long getMaxResourceId() {
+        return maxResourceId;
+    }
+
+    public void setMaxResourceId(long maxResourceId) {
+        this.maxResourceId = maxResourceId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
new file mode 100644
index 0000000..d2837ce
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class ReportMaxResourceIdRequestMessage extends AbstractApplicationMessage {
+    private static final long serialVersionUID = 1L;
+    public long maxResourceId;
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_REQUEST;
+    }
+
+    public long getMaxResourceId() {
+        return maxResourceId;
+    }
+
+    public void setMaxResourceId(long maxResourceId) {
+        this.maxResourceId = maxResourceId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
new file mode 100644
index 0000000..daeb9c4
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class ResourceIdRequestMessage extends AbstractApplicationMessage {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.RESOURCE_ID_REQUEST;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
new file mode 100644
index 0000000..09c50d3
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class ResourceIdRequestResponseMessage extends AbstractApplicationMessage {
+    private static final long serialVersionUID = 1L;
+
+    private long resourceId;
+    private Exception exception;
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.RESOURCE_ID_RESPONSE;
+    }
+
+    public long getResourceId() {
+        return resourceId;
+    }
+
+    public void setResourceId(long resourceId) {
+        this.resourceId = resourceId;
+    }
+
+    public Exception getException() {
+        return exception;
+    }
+
+    public void setException(Exception exception) {
+        this.exception = exception;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
new file mode 100644
index 0000000..61ab7cd
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessage;
+
+public interface IApplicationMessage extends IMessage {
+
+    public enum ApplicationMessageType {
+        RESOURCE_ID_REQUEST,
+        RESOURCE_ID_RESPONSE,
+        REPORT_MAX_RESOURCE_ID_REQUEST,
+        REPORT_MAX_RESOURCE_ID_RESPONSE
+    }
+
+    public abstract ApplicationMessageType getMessageType();
+
+    /**
+     * Sets a unique message id that identifies this message within an NC.
+     * This id is set by {@link INCMessageBroker#sendMessage(IApplicationMessage, IApplicationMessageCallback)}
+     * when the callback is not null to notify the sender when the response to that message is received.
+     *
+     * @param messageId
+     */
+    public void setId(long messageId);
+
+    /**
+     * @return The unique message id if it has been set, otherwise 0.
+     */
+    public long getId();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
new file mode 100644
index 0000000..3bad5fb
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+public interface IApplicationMessageCallback {
+
+    /**
+     * Notifies the message sender when the response has been received.
+     *
+     * @param message
+     *            The response message
+     */
+    public void deliverMessageResponse(IApplicationMessage message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
new file mode 100644
index 0000000..3ff83b6
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessageBroker;
+
+public interface INCMessageBroker extends IMessageBroker {
+
+    /**
+     * Sends application message from this NC to the CC.
+     *
+     * @param message
+     * @param callback
+     * @throws Exception
+     */
+    public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception;
+
+    /**
+     * Sends the maximum resource id on this NC to the CC.
+     *
+     * @throws Exception
+     */
+    public void reportMaxResourceId() throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index d308564..6382af9 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -30,7 +30,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public interface IAsterixAppRuntimeContextProvider {
 
@@ -52,7 +52,7 @@ public interface IAsterixAppRuntimeContextProvider {
 
     public ILocalResourceRepository getLocalResourceRepository();
 
-    public ResourceIdFactory getResourceIdFactory();
+    public IResourceIdFactory getResourceIdFactory();
 
     public IIOManager getIOManager();
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 47203cf..c402bef 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -41,6 +41,7 @@ import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.metadata.api.IMetadataIndex;
 import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.api.IValueExtractor;
+import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
 import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import org.apache.asterix.metadata.bootstrap.MetadataSecondaryIndexes;
 import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -108,7 +109,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 public class MetadataNode implements IMetadataNode {
     private static final long serialVersionUID = 1L;
 
-    private static final DatasetId METADATA_DATASET_ID = new DatasetId(MetadataPrimaryIndexes.METADATA_DATASET_ID);
+    private static final DatasetId METADATA_DATASET_ID = new DatasetId(MetadataIndexImmutableProperties.METADATA.getDatasetId());
 
     private IDatasetLifecycleManager datasetLifecycleManager;
     private ITransactionSubsystem transactionSubsystem;
@@ -1087,7 +1088,7 @@ public class MetadataNode implements IMetadataNode {
 
     @Override
     public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException {
-        int mostRecentDatasetId = MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID;
+        int mostRecentDatasetId = MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID;
         try {
             String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString();
             IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index c068657..d76af86 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -20,7 +20,6 @@
 package org.apache.asterix.metadata.bootstrap;
 
 import java.io.File;
-import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -39,7 +38,6 @@ import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.MetadataException;
@@ -73,7 +71,6 @@ import org.apache.asterix.transaction.management.service.transaction.Transaction
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
@@ -223,7 +220,7 @@ public class MetadataBootstrap {
         }
     }
 
-    public static void stopUniverse() throws HyracksDataException {
+    public static void stopUniverse() {
         // Close all BTree files in BufferCache.
         // metadata datasets will be closed when the dataset life cycle manger is closed
     }
@@ -404,7 +401,7 @@ public class MetadataBootstrap {
                     LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
                     null, null, null, null, true);
             lsmBtree.create();
-            resourceID = runtimeContext.getResourceIdFactory().createId();
+            resourceID = index.getResourceID();
             ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
                     comparatorFactories, bloomFilterKeyFields, index.isPrimaryIndex(), index.getDatasetId().getId(),
                     runtimeContext.getMetadataMergePolicyFactory(), GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES,
@@ -443,7 +440,7 @@ public class MetadataBootstrap {
         return metadataNodeName;
     }
 
-    public static void startDDLRecovery() throws RemoteException, ACIDException, MetadataException {
+    public static void startDDLRecovery() throws MetadataException {
         //#. clean up any record which has pendingAdd/DelOp flag 
         //   as traversing all records from DATAVERSE_DATASET to DATASET_DATASET, and then to INDEX_DATASET.
         String dataverseName = null;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
index 3a76db7..9ef9f84 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
@@ -79,9 +79,9 @@ public final class MetadataIndex implements IMetadataIndex {
     // PrimaryKeyField indexes used for secondary index operations
     protected final int[] primaryKeyIndexes;
 
-    public MetadataIndex(String datasetName, String indexName, int numFields, IAType[] keyTypes,
-            List<List<String>> keyNames, int numSecondaryIndexKeys, ARecordType payloadType, int datasetId,
-            boolean isPrimaryIndex, int[] primaryKeyIndexes) throws AsterixRuntimeException {
+    public MetadataIndex(MetadataIndexImmutableProperties indexImmutableProperties, int numFields, IAType[] keyTypes,
+            List<List<String>> keyNames, int numSecondaryIndexKeys, ARecordType payloadType, boolean isPrimaryIndex,
+            int[] primaryKeyIndexes) throws AsterixRuntimeException {
         // Sanity checks.
         if (keyTypes.length != keyNames.size()) {
             throw new AsterixRuntimeException("Unequal number of key types and names given.");
@@ -90,12 +90,8 @@ public final class MetadataIndex implements IMetadataIndex {
             throw new AsterixRuntimeException("Number of keys given is greater than total number of fields.");
         }
         // Set simple fields.
-        this.datasetName = datasetName;
-        if (indexName == null) {
-            this.indexName = datasetName;
-        } else {
-            this.indexName = indexName;
-        }
+        this.datasetName = indexImmutableProperties.getDatasetName();
+        this.indexName = indexImmutableProperties.getIndexName();
         this.keyTypes = keyTypes;
         this.keyNames = keyNames;
         this.payloadType = payloadType;
@@ -147,11 +143,12 @@ public final class MetadataIndex implements IMetadataIndex {
             }
         }
 
-        this.datasetId = new DatasetId(datasetId);
+        this.datasetId = new DatasetId(indexImmutableProperties.getDatasetId());
         this.isPrimaryIndex = isPrimaryIndex;
 
         //PrimaryKeyFieldIndexes
         this.primaryKeyIndexes = primaryKeyIndexes;
+        this.resourceId = indexImmutableProperties.getResourceId();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java
new file mode 100644
index 0000000..9b4d0d1
--- /dev/null
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.bootstrap;
+
+public enum MetadataIndexImmutableProperties {
+    METADATA(MetadataConstants.METADATA_DATAVERSE_NAME, 0, 0),
+    DATAVERSE("Dataverse", 1, 1),
+    DATASET("Dataset", 2, 2),
+    DATATYPE("Datatype", 3, 3),
+    INDEX("Index", 4, 4),
+    NODE("Node", 5, 5),
+    NODEGROUP("Nodegroup", 6, 6),
+    FUNCTION("Function", 7, 7),
+    DATASOURCE_ADAPTER("DatasourceAdapter", 8, 8),
+    LIBRARY("Library", 9, 9),
+    FEED("Feed", 10, 10),
+    FEED_ACTIVITY_DATASET_ID("FeedActivity", 11, 11),
+    FEED_POLICY("FeedPolicy", 12, 12),
+    COMPACTION_POLICY("CompactionPolicy", 13, 13),
+    EXTERNAL_FILE("ExternalFile", 14, 14),
+    GROUPNAME_ON_DATASET("GroupName", DATASET, 15),
+    DATATYPE_NAME_ON_DATASET("DatatypeName", DATASET, 16),
+    DATATYPE_NAME_ON_DATATYPE("DatatypeName", DATATYPE, 17);
+
+    private final String indexName;
+    private final int datasetId;
+    private final long resourceId;
+    private final MetadataIndexImmutableProperties dataset;
+
+    public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100;
+
+    private MetadataIndexImmutableProperties(String indexName, int datasetId, long resourceId) {
+        this.indexName = indexName;
+        this.datasetId = datasetId;
+        this.resourceId = resourceId;
+        //a primary index's dataset is itself
+        this.dataset = this;
+    }
+
+    private MetadataIndexImmutableProperties(String indexName, MetadataIndexImmutableProperties dataset,
+            long resourceId) {
+        this.indexName = indexName;
+        this.datasetId = dataset.datasetId;
+        this.resourceId = resourceId;
+        this.dataset = dataset;
+    }
+
+    public long getResourceId() {
+        return resourceId;
+    }
+
+    public String getIndexName() {
+        return indexName;
+    }
+
+    public String getDatasetName() {
+        return dataset.indexName;
+    }
+
+    public int getDatasetId() {
+        return dataset.datasetId;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 9bcad8f..258e1c4 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -46,29 +46,9 @@ public class MetadataPrimaryIndexes {
     public static IMetadataIndex COMPACTION_POLICY_DATASET;
     public static IMetadataIndex EXTERNAL_FILE_DATASET;
 
-    public static final int METADATA_DATASET_ID = 0;
-    public static final int DATAVERSE_DATASET_ID = 1;
-    public static final int DATASET_DATASET_ID = 2;
-    public static final int DATATYPE_DATASET_ID = 3;
-    public static final int INDEX_DATASET_ID = 4;
-    public static final int NODE_DATASET_ID = 5;
-    public static final int NODEGROUP_DATASET_ID = 6;
-    public static final int FUNCTION_DATASET_ID = 7;
-    public static final int DATASOURCE_ADAPTER_DATASET_ID = 8;
-
-    public static final int LIBRARY_DATASET_ID = 9;
-    public static final int FEED_DATASET_ID = 10;
-    public static final int FEED_ACTIVITY_DATASET_ID = 11;
-    public static final int FEED_POLICY_DATASET_ID = 12;
-    public static final int COMPACTION_POLICY_DATASET_ID = 13;
-    public static final int EXTERNAL_FILE_DATASET_ID = 14;
-
-    public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100;
-
     /**
      * Create all metadata primary index descriptors. MetadataRecordTypes must
      * have been initialized before calling this init.
-     * 
      * @throws MetadataException
      *             If MetadataRecordTypes have not been initialized.
      */
@@ -79,63 +59,70 @@ public class MetadataPrimaryIndexes {
                     "Must initialize MetadataRecordTypes before initializing MetadataPrimaryIndexes");
         }
 
-        DATAVERSE_DATASET = new MetadataIndex("Dataverse", null, 2, new IAType[] { BuiltinType.ASTRING },
-                (Arrays.asList(Arrays.asList("DataverseName"))), 0, MetadataRecordTypes.DATAVERSE_RECORDTYPE,
-                DATAVERSE_DATASET_ID, true, new int[] { 0 });
-
-        DATASET_DATASET = new MetadataIndex("Dataset", null, 3,
-                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(
-                        Arrays.asList("DataverseName"), Arrays.asList("DatasetName"))), 0,
-                MetadataRecordTypes.DATASET_RECORDTYPE, DATASET_DATASET_ID, true, new int[] { 0, 1 });
-
-        DATATYPE_DATASET = new MetadataIndex("Datatype", null, 3, new IAType[] { BuiltinType.ASTRING,
-                BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatatypeName"))),
-                0, MetadataRecordTypes.DATATYPE_RECORDTYPE, DATATYPE_DATASET_ID, true, new int[] { 0, 1 });
-
-        INDEX_DATASET = new MetadataIndex("Index", null, 4, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
-                BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatasetName"),
-                Arrays.asList("IndexName"))), 0, MetadataRecordTypes.INDEX_RECORDTYPE, INDEX_DATASET_ID, true,
-                new int[] { 0, 1, 2 });
-
-        NODE_DATASET = new MetadataIndex("Node", null, 2, new IAType[] { BuiltinType.ASTRING }, (Arrays.asList(Arrays
-                .asList("NodeName"))), 0, MetadataRecordTypes.NODE_RECORDTYPE, NODE_DATASET_ID, true, new int[] { 0 });
-
-        NODEGROUP_DATASET = new MetadataIndex("Nodegroup", null, 2, new IAType[] { BuiltinType.ASTRING },
-                (Arrays.asList(Arrays.asList("GroupName"))), 0, MetadataRecordTypes.NODEGROUP_RECORDTYPE,
-                NODEGROUP_DATASET_ID, true, new int[] { 0 });
-
-        FUNCTION_DATASET = new MetadataIndex("Function", null, 4, new IAType[] { BuiltinType.ASTRING,
-                BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"),
-                Arrays.asList("Name"), Arrays.asList("Arity"))), 0, MetadataRecordTypes.FUNCTION_RECORDTYPE,
-                FUNCTION_DATASET_ID, true, new int[] { 0, 1, 2 });
-
-        DATASOURCE_ADAPTER_DATASET = new MetadataIndex("DatasourceAdapter", null, 3, new IAType[] {
-                BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"),
-                Arrays.asList("Name"))), 0, MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE,
-                DATASOURCE_ADAPTER_DATASET_ID, true, new int[] { 0, 1 });
+        DATAVERSE_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.DATAVERSE, 2,
+                new IAType[] { BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"))), 0,
+                MetadataRecordTypes.DATAVERSE_RECORDTYPE, true, new int[] { 0 });
+
+        DATASET_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.DATASET, 3,
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+                (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatasetName"))), 0,
+                MetadataRecordTypes.DATASET_RECORDTYPE, true, new int[] { 0, 1 });
+
+        DATATYPE_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.DATATYPE, 3,
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+                (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatatypeName"))), 0,
+                MetadataRecordTypes.DATATYPE_RECORDTYPE, true, new int[] { 0, 1 });
+
+        INDEX_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.INDEX, 4,
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+                (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatasetName"),
+                        Arrays.asList("IndexName"))),
+                0, MetadataRecordTypes.INDEX_RECORDTYPE, true, new int[] { 0, 1, 2 });
+
+        NODE_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.NODE, 2, new IAType[] { BuiltinType.ASTRING },
+                (Arrays.asList(Arrays.asList("NodeName"))), 0, MetadataRecordTypes.NODE_RECORDTYPE, true,
+                new int[] { 0 });
+
+        NODEGROUP_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.NODEGROUP, 2,
+                new IAType[] { BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("GroupName"))), 0,
+                MetadataRecordTypes.NODEGROUP_RECORDTYPE, true, new int[] { 0 });
+
+        FUNCTION_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.FUNCTION, 4,
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+                (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("Name"), Arrays.asList("Arity"))), 0,
+                MetadataRecordTypes.FUNCTION_RECORDTYPE, true, new int[] { 0, 1, 2 });
+
+        DATASOURCE_ADAPTER_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.DATASOURCE_ADAPTER, 3,
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+                (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("Name"))), 0,
+                MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE, true,
+                new int[] { 0, 1 });
 
-        FEED_DATASET = new MetadataIndex("Feed", null, 3, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+        FEED_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.FEED, 3,
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
                 (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("FeedName"))), 0,
-                MetadataRecordTypes.FEED_RECORDTYPE, FEED_DATASET_ID, true, new int[] { 0, 1 });
+                MetadataRecordTypes.FEED_RECORDTYPE, true, new int[] { 0, 1 });
 
-        LIBRARY_DATASET = new MetadataIndex("Library", null, 3,
-                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(
-                        Arrays.asList("DataverseName"), Arrays.asList("Name"))), 0,
-                MetadataRecordTypes.LIBRARY_RECORDTYPE, LIBRARY_DATASET_ID, true, new int[] { 0, 1 });
+        LIBRARY_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.LIBRARY, 3,
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+                (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("Name"))), 0,
+                MetadataRecordTypes.LIBRARY_RECORDTYPE, true, new int[] { 0, 1 });
 
-        FEED_POLICY_DATASET = new MetadataIndex("FeedPolicy", null, 3, new IAType[] { BuiltinType.ASTRING,
-                BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("PolicyName"))), 0,
-                MetadataRecordTypes.FEED_POLICY_RECORDTYPE, FEED_POLICY_DATASET_ID, true, new int[] { 0, 1 });
+        FEED_POLICY_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.FEED_POLICY, 3,
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+                (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("PolicyName"))), 0,
+                MetadataRecordTypes.FEED_POLICY_RECORDTYPE, true, new int[] { 0, 1 });
 
-        COMPACTION_POLICY_DATASET = new MetadataIndex("CompactionPolicy", null, 3, new IAType[] { BuiltinType.ASTRING,
-                BuiltinType.ASTRING },
+        COMPACTION_POLICY_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.COMPACTION_POLICY, 3,
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
                 (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("CompactionPolicy"))), 0,
-                MetadataRecordTypes.COMPACTION_POLICY_RECORDTYPE, COMPACTION_POLICY_DATASET_ID, true,
+                MetadataRecordTypes.COMPACTION_POLICY_RECORDTYPE, true,
                 new int[] { 0, 1 });
 
-        EXTERNAL_FILE_DATASET = new MetadataIndex("ExternalFile", null, 4, new IAType[] { BuiltinType.ASTRING,
-                BuiltinType.ASTRING, BuiltinType.AINT32 }, (Arrays.asList(Arrays.asList("DataverseName"),
-                Arrays.asList("DatasetName"), Arrays.asList("FileNumber"))), 0,
-                MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE, EXTERNAL_FILE_DATASET_ID, true, new int[] { 0, 1, 2 });
+        EXTERNAL_FILE_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.EXTERNAL_FILE, 4,
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 },
+                (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatasetName"),
+                        Arrays.asList("FileNumber"))),
+                0, MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE, true, new int[] { 0, 1, 2 });
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java
index 651021c..fbe339f 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java
@@ -37,7 +37,6 @@ public class MetadataSecondaryIndexes {
     /**
      * Create all metadata secondary index descriptors. MetadataRecordTypes must
      * have been initialized before calling this init.
-     * 
      * @throws MetadataException
      *             If MetadataRecordTypes have not been initialized.
      */
@@ -48,21 +47,23 @@ public class MetadataSecondaryIndexes {
                     "Must initialize MetadataRecordTypes before initializing MetadataSecondaryIndexes.");
         }
 
-        GROUPNAME_ON_DATASET_INDEX = new MetadataIndex("Dataset", "GroupName", 3, new IAType[] { BuiltinType.ASTRING,
-                BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("GroupName"),
-                Arrays.asList("DataverseName"), Arrays.asList("DatasetName"))), 1, null,
-                MetadataPrimaryIndexes.DATASET_DATASET_ID, false, new int[] { 1, 2 });
+        GROUPNAME_ON_DATASET_INDEX = new MetadataIndex(MetadataIndexImmutableProperties.GROUPNAME_ON_DATASET, 3,
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+                (Arrays.asList(Arrays.asList("GroupName"), Arrays.asList("DataverseName"),
+                        Arrays.asList("DatasetName"))),
+                1, null, false, new int[] { 1, 2 });
 
-        DATATYPENAME_ON_DATASET_INDEX = new MetadataIndex("Dataset", "DatatypeName", 3, new IAType[] {
-                BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(
-                Arrays.asList("DataverseName"), Arrays.asList("DatatypeName"), Arrays.asList("DatasetName"))), 2, null,
-                MetadataPrimaryIndexes.DATASET_DATASET_ID, false, new int[] { 0, 2 });
+        DATATYPENAME_ON_DATASET_INDEX = new MetadataIndex(MetadataIndexImmutableProperties.DATATYPE_NAME_ON_DATASET, 3,
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+                (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatatypeName"),
+                        Arrays.asList("DatasetName"))),
+                2, null, false, new int[] { 0, 2 });
 
-        DATATYPENAME_ON_DATATYPE_INDEX = new MetadataIndex("Datatype", "DatatypeName", 3, new IAType[] {
-                BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+        DATATYPENAME_ON_DATATYPE_INDEX = new MetadataIndex(MetadataIndexImmutableProperties.DATATYPE_NAME_ON_DATATYPE,
+                3, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
                 (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("NestedDatatypeName"),
-                        Arrays.asList("TopDatatypeName"))), 2, null, MetadataPrimaryIndexes.DATATYPE_DATASET_ID, false,
-                new int[] { 0, 2 });
+                        Arrays.asList("TopDatatypeName"))),
+                2, null, false, new int[] { 0, 2 });
 
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index 438805a..f2482da 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -185,4 +185,16 @@ public class AsterixClusterProperties {
     public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) {
         this.globalRecoveryCompleted = globalRecoveryCompleted;
     }
+
+    public static boolean isClusterActive() {
+        if (AsterixClusterProperties.INSTANCE.getCluster() == null) {
+            //this is a virtual cluster
+            return true;
+        }
+        return AsterixClusterProperties.INSTANCE.getState() == ClusterState.ACTIVE;
+    }
+
+    public static int getNumberOfNodes(){
+        return AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames().size();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
new file mode 100644
index 0000000..ca7ba51
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.resource;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
+import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.hyracks.api.application.IApplicationContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
+
+/**
+ * A resource id factory that generates unique resource ids across all NCs by requesting unique ids from the cluster controller.
+ */
+public class GlobalResourceIdFactory implements IResourceIdFactory, IApplicationMessageCallback {
+
+    private final IApplicationContext appCtx;
+    private final LinkedBlockingQueue<IApplicationMessage> resourceIdResponseQ;
+
+    public GlobalResourceIdFactory(IApplicationContext appCtx) {
+        this.appCtx = appCtx;
+        this.resourceIdResponseQ = new LinkedBlockingQueue<>();
+    }
+
+    @Override
+    public long createId() throws HyracksDataException {
+        try {
+            ResourceIdRequestResponseMessage reponse = null;
+            //if there already exists a response, use it
+            if (resourceIdResponseQ.size() > 0) {
+                synchronized (resourceIdResponseQ) {
+                    if (resourceIdResponseQ.size() > 0) {
+                        reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
+                    }
+                }
+            }
+            //if no response available or it has an exception, request a new one
+            if (reponse == null || reponse.getException() != null) {
+                ResourceIdRequestMessage msg = new ResourceIdRequestMessage();
+                ((INCMessageBroker) appCtx.getMessageBroker()).sendMessage(msg, this);
+                reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
+                if (reponse.getException() != null) {
+                    throw new HyracksDataException(reponse.getException().getMessage());
+                }
+            }
+            return reponse.getResourceId();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void deliverMessageResponse(IApplicationMessage message) {
+        resourceIdResponseQ.offer(message);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java
new file mode 100644
index 0000000..ec42139
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.resource;
+
+import org.apache.hyracks.api.application.IApplicationContext;
+
+public class GlobalResourceIdFactoryProvider {
+
+    private final IApplicationContext appCtx;
+
+    public GlobalResourceIdFactoryProvider(IApplicationContext appCtx) {
+        this.appCtx = appCtx;
+    }
+
+    public GlobalResourceIdFactory createResourceIdFactory() {
+        return new GlobalResourceIdFactory(appCtx);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
index 1686e17..01a451c 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
@@ -28,7 +28,7 @@ import org.apache.hyracks.storage.common.IStorageManagerInterface;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
         ILSMIOOperationSchedulerProvider {
@@ -71,7 +71,7 @@ public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerP
     }
 
     @Override
-    public ResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
+    public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
         return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
                 .getResourceIdFactory();
     }


Mime
View raw message