asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [08/16] asterixdb git commit: Add Asterix Extension Manager
Date Sat, 20 Aug 2016 06:15:50 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixExtension.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixExtension.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixExtension.java
new file mode 100644
index 0000000..7417bc6
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixExtension.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.config;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+public class AsterixExtension {
+    private final String className;
+    private final List<Pair<String, String>> args;
+
+    public AsterixExtension(String className, List<Pair<String, String>> args) {
+        this.className = className;
+        this.args = args;
+    }
+
+    public List<Pair<String, String>> getArgs() {
+        return args;
+    }
+
+    public String getClassName() {
+        return className;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixExtensionProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixExtensionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixExtensionProperties.java
new file mode 100644
index 0000000..e8f2bcc
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixExtensionProperties.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.config;
+
+import java.util.List;
+
+public class AsterixExtensionProperties extends AbstractAsterixProperties {
+
+    public AsterixExtensionProperties(AsterixPropertiesAccessor accessor) {
+        super(accessor);
+    }
+
+    public List<AsterixExtension> getExtensions() {
+        return accessor.getExtensions();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixProperties.java
new file mode 100644
index 0000000..1576774
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixProperties.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.config;
+
+import java.io.File;
+
+public class AsterixProperties {
+    //---------------------------- Directories ---------------------------//
+    private static final String VAR = File.separator + "var";
+    private static final String LIB = VAR + File.separator + "lib";
+    private static final String ASTERIXDB = LIB + File.separator + "asterixdb";
+    //----------------------------- Sections -----------------------------//
+    public static final String SECTION_ASTERIX = "asterix";
+    public static final String SECTION_PREFIX_EXTENSION = "extension/";
+    public static final String SECTION_CC = "cc";
+    public static final String SECTION_PREFIX_NC = "nc/";
+    //---------------------------- Properties ---=------------------------//
+    public static final String PROPERTY_CLUSTER_ADDRESS = "cluster.address";
+    public static final String PROPERTY_INSTANCE_NAME = "instance";
+    public static final String DEFAULT_INSTANCE_NAME = "DEFAULT_INSTANCE";
+    public static final String PROPERTY_METADATA_PORT = "metadata.port";
+    public static final String PROPERTY_COREDUMP_DIR = "coredumpdir";
+    public static final String DEFAULT_COREDUMP_DIR = String.join(File.separator, ASTERIXDB, "coredump");
+    public static final String PROPERTY_TXN_LOG_DIR = "txnlogdir";
+    public static final String DEFAULT_TXN_LOG_DIR = String.join(File.separator, ASTERIXDB, "txn-log");
+    public static final String PROPERTY_IO_DEV = "iodevices";
+    public static final String DEFAULT_IO_DEV = String.join(File.separator, ASTERIXDB, "iodevice");
+    public static final String PROPERTY_STORAGE_DIR = "storagedir";
+    public static final String DEFAULT_STORAGE_DIR = "storage";
+    public static final String PROPERTY_CLASS = "class";
+
+    private AsterixProperties() {
+    }
+
+    public static final String getSectionId(String prefix, String section) {
+        return section.substring(prefix.length());
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 7309f0c..ea1ee31 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -28,10 +28,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
@@ -40,14 +39,20 @@ import javax.xml.bind.Unmarshaller;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.common.configuration.Coredump;
+import org.apache.asterix.common.configuration.Extension;
 import org.apache.asterix.common.configuration.Property;
 import org.apache.asterix.common.configuration.Store;
 import org.apache.asterix.common.configuration.TransactionLogDir;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.utils.ConfigUtil;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.application.IApplicationConfig;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 
 public class AsterixPropertiesAccessor {
-    private static Logger LOGGER = Logger.getLogger(AsterixPropertiesAccessor.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(AsterixPropertiesAccessor.class.getName());
 
     private final String instanceName;
     private final String metadataNodeName;
@@ -62,36 +67,22 @@ public class AsterixPropertiesAccessor {
     private final Map<String, String> asterixBuildProperties = new HashMap<>();
     private final Map<String, ClusterPartition[]> nodePartitionsMap;
     private final SortedMap<Integer, ClusterPartition> clusterPartitions = new TreeMap<>();
+    // For extensions
+    private final List<AsterixExtension> extensions;
 
     /**
      * Constructor which reads asterix-configuration.xml, the old way.
+     *
      * @throws AsterixException
+     * @throws IOException
      */
-    public AsterixPropertiesAccessor() throws AsterixException {
+    public AsterixPropertiesAccessor() throws AsterixException, IOException {
         String fileName = System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);
         if (fileName == null) {
             fileName = GlobalConfig.DEFAULT_CONFIG_FILE_NAME;
         }
-
-        InputStream is = this.getClass().getClassLoader().getResourceAsStream(fileName);
-        if (is == null) {
-            try {
-                fileName = GlobalConfig.DEFAULT_CONFIG_FILE_NAME;
-                is = new FileInputStream(fileName);
-            } catch (FileNotFoundException fnf) {
-                throw new AsterixException("Could not find configuration file " + fileName);
-            }
-        }
-
-        AsterixConfiguration asterixConfiguration = null;
+        AsterixConfiguration asterixConfiguration = configure(fileName);
         cfg = null;
-        try {
-            JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class);
-            Unmarshaller unmarshaller = ctx.createUnmarshaller();
-            asterixConfiguration = (AsterixConfiguration) unmarshaller.unmarshal(is);
-        } catch (JAXBException e) {
-            throw new AsterixException("Failed to read configuration file " + fileName);
-        }
         instanceName = asterixConfiguration.getInstanceName();
         metadataNodeName = asterixConfiguration.getMetadataNode();
         List<Store> configuredStores = asterixConfiguration.getStore();
@@ -117,7 +108,16 @@ public class AsterixPropertiesAccessor {
             nodePartitionsMap.put(store.getNcId(), nodePartitions);
             nodeNames.add(store.getNcId());
         }
-        asterixConfigurationParams = new HashMap<String, Property>();
+
+        // Get extensions
+        extensions = new ArrayList<>();
+        if (asterixConfiguration.getExtensions() != null) {
+            for (Extension ext : asterixConfiguration.getExtensions().getExtension()) {
+                extensions.add(ConfigUtil.toAsterixExtension(ext));
+            }
+        }
+
+        asterixConfigurationParams = new HashMap<>();
         for (Property p : asterixConfiguration.getProperty()) {
             asterixConfigurationParams.put(p.getName(), p);
         }
@@ -130,56 +130,56 @@ public class AsterixPropertiesAccessor {
         loadAsterixBuildProperties();
     }
 
+    private AsterixConfiguration configure(String fileName) throws IOException, AsterixException {
+        try (InputStream is = this.getClass().getClassLoader().getResourceAsStream(fileName)) {
+            if (is != null) {
+                return configure(is, fileName);
+            }
+        }
+        try (FileInputStream is = new FileInputStream(fileName)) {
+            return configure(is, fileName);
+        } catch (FileNotFoundException fnf1) {
+            LOGGER.warn("Failed to get configuration file " + fileName + " as FileInputStream. FileNotFoundException");
+            LOGGER.warn("Attempting to get default configuration file " + GlobalConfig.DEFAULT_CONFIG_FILE_NAME
+                    + " as FileInputStream");
+            try (FileInputStream fis = new FileInputStream(GlobalConfig.DEFAULT_CONFIG_FILE_NAME)) {
+                return configure(fis, GlobalConfig.DEFAULT_CONFIG_FILE_NAME);
+            } catch (FileNotFoundException fnf2) {
+                fnf1.addSuppressed(fnf2);
+                throw new AsterixException("Could not find configuration file " + fileName, fnf1);
+            }
+        }
+    }
+
+    private AsterixConfiguration configure(InputStream is, String fileName) throws AsterixException {
+        try {
+            JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class);
+            Unmarshaller unmarshaller = ctx.createUnmarshaller();
+            return (AsterixConfiguration) unmarshaller.unmarshal(is);
+        } catch (JAXBException e) {
+            throw new AsterixException("Failed to read configuration file " + fileName, e);
+        }
+    }
+
     /**
      * Constructor which wraps an IApplicationConfig.
      */
     public AsterixPropertiesAccessor(IApplicationConfig cfg) throws AsterixException {
         this.cfg = cfg;
-        instanceName = cfg.getString("asterix", "instance", "DEFAULT_INSTANCE");
+        instanceName = cfg.getString(AsterixProperties.SECTION_ASTERIX, AsterixProperties.PROPERTY_INSTANCE_NAME,
+                AsterixProperties.DEFAULT_INSTANCE_NAME);
         String mdNode = null;
         nodePartitionsMap = new HashMap<>();
-        int uniquePartitionId = 0;
-
+        MutableInt uniquePartitionId = new MutableInt(0);
+        extensions = new ArrayList<>();
         // Iterate through each configured NC.
         for (String section : cfg.getSections()) {
-            if (!section.startsWith("nc/")) {
-                continue;
-            }
-            String ncId = section.substring(3);
-
-            // Here we figure out which is the metadata node. If any NCs
-            // declare "metadata.port", use that one; otherwise just use the first.
-            if (mdNode == null) {
-                mdNode = ncId;
-            }
-            if (cfg.getString(section, "metadata.port") != null) {
-                // QQQ But we don't actually *honor* metadata.port yet!
-                mdNode = ncId;
-            }
-
-            // Now we assign the coredump and txnlog directories for this node.
-            // QQQ Default values? Should they be specified here? Or should there
-            // be a default.ini? Certainly wherever they are, they should be platform-dependent.
-            coredumpConfig.put(ncId, cfg.getString(section, "coredumpdir", "/var/lib/asterixdb/coredump"));
-            transactionLogDirs.put(ncId, cfg.getString(section, "txnlogdir", "/var/lib/asterixdb/txn-log"));
-
-            // Now we create an array of ClusterPartitions for all the partitions
-            // on this NC.
-            String[] iodevices = cfg.getString(section, "iodevices", "/var/lib/asterixdb/iodevice").split(",");
-            String storageSubdir = cfg.getString(section, "storagedir", "storage");
-            String[] nodeStores = new String[iodevices.length];
-            ClusterPartition[] nodePartitions = new ClusterPartition[iodevices.length];
-            for (int i = 0; i < nodePartitions.length; i++) {
-                // Construct final storage path from iodevice dir + storage subdir.
-                nodeStores[i] = iodevices[i] + File.separator + storageSubdir;
-                // Create ClusterPartition instances for this NC.
-                ClusterPartition partition = new ClusterPartition(uniquePartitionId++, ncId, i);
-                clusterPartitions.put(partition.getPartitionId(), partition);
-                nodePartitions[i] = partition;
+            if (section.startsWith(AsterixProperties.SECTION_PREFIX_NC)) {
+                mdNode = configureNc(section, mdNode, uniquePartitionId);
+            } else if (section.startsWith(AsterixProperties.SECTION_PREFIX_EXTENSION)) {
+                String className = AsterixProperties.getSectionId(AsterixProperties.SECTION_PREFIX_EXTENSION, section);
+                configureExtension(className, section);
             }
-            stores.put(ncId, nodeStores);
-            nodePartitionsMap.put(ncId, nodePartitions);
-            nodeNames.add(ncId);
         }
 
         metadataNodeName = mdNode;
@@ -187,6 +187,58 @@ public class AsterixPropertiesAccessor {
         loadAsterixBuildProperties();
     }
 
+    private void configureExtension(String className, String section) {
+        Set<String> keys = cfg.getKeys(section);
+        List<Pair<String, String>> kvs = new ArrayList<>();
+        for (String key : keys) {
+            String value = cfg.getString(section, key);
+            kvs.add(new Pair<>(key, value));
+        }
+        extensions.add(new AsterixExtension(className, kvs));
+    }
+
+    private String configureNc(String section, String mdNode, MutableInt uniquePartitionId) {
+        String ncId = AsterixProperties.getSectionId(AsterixProperties.SECTION_PREFIX_NC, section);
+        String newMetadataNode = mdNode;
+
+        // Here we figure out which is the metadata node. If any NCs
+        // declare "metadata.port", use that one; otherwise just use the first.
+        if (mdNode == null || cfg.getString(section, AsterixProperties.PROPERTY_METADATA_PORT) != null) {
+            // QQQ But we don't actually *honor* metadata.port yet!
+            newMetadataNode = ncId;
+        }
+
+        // Now we assign the coredump and txnlog directories for this node.
+        // QQQ Default values? Should they be specified here? Or should there
+        // be a default.ini? Certainly wherever they are, they should be platform-dependent.
+        coredumpConfig.put(ncId, cfg.getString(section, AsterixProperties.PROPERTY_COREDUMP_DIR,
+                AsterixProperties.DEFAULT_COREDUMP_DIR));
+        transactionLogDirs.put(ncId,
+                cfg.getString(section, AsterixProperties.PROPERTY_TXN_LOG_DIR, AsterixProperties.DEFAULT_TXN_LOG_DIR));
+
+        // Now we create an array of ClusterPartitions for all the partitions
+        // on this NC.
+        String[] iodevices = cfg.getString(section, AsterixProperties.PROPERTY_IO_DEV,
+                AsterixProperties.DEFAULT_IO_DEV).split(",");
+        String storageSubdir = cfg.getString(section, AsterixProperties.PROPERTY_STORAGE_DIR,
+                AsterixProperties.DEFAULT_STORAGE_DIR);
+        String[] nodeStores = new String[iodevices.length];
+        ClusterPartition[] nodePartitions = new ClusterPartition[iodevices.length];
+        for (int i = 0; i < nodePartitions.length; i++) {
+            // Construct final storage path from iodevice dir + storage subdir.
+            nodeStores[i] = iodevices[i] + File.separator + storageSubdir;
+            // Create ClusterPartition instances for this NC.
+            ClusterPartition partition = new ClusterPartition(uniquePartitionId.getValue(), ncId, i);
+            uniquePartitionId.increment();
+            clusterPartitions.put(partition.getPartitionId(), partition);
+            nodePartitions[i] = partition;
+        }
+        stores.put(ncId, nodeStores);
+        nodePartitionsMap.put(ncId, nodePartitions);
+        nodeNames.add(ncId);
+        return newMetadataNode;
+    }
+
     private void loadAsterixBuildProperties() throws AsterixException {
         Properties gitProperties = new Properties();
         try {
@@ -242,14 +294,14 @@ public class AsterixPropertiesAccessor {
         try {
             return interpreter.interpret(value);
         } catch (IllegalArgumentException e) {
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                StringBuilder msg =
-                        new StringBuilder("Invalid property value '" + value + "' for property '" + property + "'.\n");
+            if (LOGGER.isEnabledFor(Level.ERROR)) {
+                StringBuilder msg = new StringBuilder(
+                        "Invalid property value '" + value + "' for property '" + property + "'.\n");
                 if (p != null) {
                     msg.append("See the description: \n" + p.getDescription() + "\n");
                 }
                 msg.append("Default = " + defaultValue);
-                LOGGER.severe(msg.toString());
+                LOGGER.error(msg.toString());
             }
             throw e;
         }
@@ -271,4 +323,8 @@ public class AsterixPropertiesAccessor {
     public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
         return clusterPartitions;
     }
+
+    public List<AsterixExtension> getExtensions() {
+        return extensions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataConstants.java
deleted file mode 100644
index 943e385..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataConstants.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.common.config;
-
-public class MetadataConstants {
-
-    // Name of the dataverse the metadata lives in.
-    public final static String METADATA_DATAVERSE_NAME = "Metadata";
-
-    // Name of the node group where metadata is stored on.
-    public final static String METADATA_NODEGROUP_NAME = "MetadataGroup";
-
-    // Name of the default nodegroup where internal/feed datasets will be partitioned
-    // if an explicit nodegroup is not specified at the time of creation of a dataset
-    public static final String METADATA_DEFAULT_NODEGROUP_NAME = "DEFAULT_NG_ALL_NODES";
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 66a01e0..a56d6f6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -21,6 +21,8 @@ package org.apache.asterix.common.exceptions;
 public class ErrorCode {
     public static final String ASTERIX = "ASX";
     public static final int ERROR_CASTING_FIELD = 0;
+    public static final int ERROR_EXTENSION_CONFLICT = 1;
+    public static final int ERROR_PARSE_ERROR = 2;
 
     private ErrorCode() {
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 7940680..9742a6c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -36,7 +36,8 @@ public interface IApplicationMessage extends IMessage {
         COMPLETE_FAILBACK_REQUEST,
         COMPLETE_FAILBACK_RESPONSE,
         REPLICA_EVENT,
-        ACTIVE_ENTITY_MESSAGE
+        ACTIVE_ENTITY_TO_CC_MESSAGE,
+        ACTIVE_MANAGER_MESSAGE
     }
 
     public abstract ApplicationMessageType getMessageType();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/ConfigUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/ConfigUtil.java
new file mode 100644
index 0000000..5db1693
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/ConfigUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+import java.util.ArrayList;
+
+import org.apache.asterix.common.config.AsterixExtension;
+import org.apache.asterix.common.configuration.Extension;
+import org.apache.asterix.common.configuration.Property;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+public class ConfigUtil {
+
+    private ConfigUtil() {
+    }
+
+    public static AsterixExtension toAsterixExtension(Extension ext) {
+        String className = ext.getExtensionClassName();
+        ArrayList<Pair<String, String>> args = new ArrayList<>();
+        for (Property property : ext.getProperty()) {
+            args.add(new Pair<>(property.getName(), property.getValue()));
+        }
+        return new AsterixExtension(className, args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-common/src/main/resources/schema/asterix-conf.xsd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/resources/schema/asterix-conf.xsd b/asterixdb/asterix-common/src/main/resources/schema/asterix-conf.xsd
index bb99319..d6a35cd 100644
--- a/asterixdb/asterix-common/src/main/resources/schema/asterix-conf.xsd
+++ b/asterixdb/asterix-common/src/main/resources/schema/asterix-conf.xsd
@@ -56,6 +56,9 @@
     <xs:element
         name="txnLogDirPath"
         type="xs:string" />
+    <xs:element
+        name="extensionClassName"
+        type="xs:string" />
 
     <!-- definition of complex elements -->
     <xs:element name="store">
@@ -95,6 +98,29 @@
         </xs:complexType>
     </xs:element>
 
+    <xs:element name="extension">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:element ref="mg:extensionClassName"
+                maxOccurs="1"
+                minOccurs="1"/>
+                <xs:element ref="mg:property"
+                    minOccurs="0"
+                    maxOccurs="unbounded" />
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="extensions">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:element ref="mg:extension"
+                    minOccurs="0"
+                    maxOccurs="unbounded">
+                </xs:element>
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
 
     <xs:element name="asterixConfiguration">
         <xs:complexType>
@@ -117,6 +143,9 @@
                 <xs:element
                     ref="mg:transactionLogDir"
                     maxOccurs="unbounded" />
+                <xs:element ref="mg:extensions"
+                    minOccurs="0"
+                    maxOccurs="1" />
                 <xs:element
                     ref="mg:property"
                     minOccurs="0"

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
deleted file mode 100644
index 1752054..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.util.Map;
-
-public interface ISubscriberRuntime {
-
-    public Map<String, String> getFeedPolicy();
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 6159908..d4e3641 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -76,7 +76,7 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
         this.spiller =
                 fpa.spillToDiskOnCongestion() ? new FrameSpiller(ctx,
                         connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
-                                + runtimeId.getFeedRuntimeType() + "_" + runtimeId.getPartition(),
+                                + runtimeId.getRuntimeName() + "_" + runtimeId.getPartition(),
                         fpa.getMaxSpillOnDisk()) : null;
         this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
         this.fpa = fpa;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 40d2500..c40fed6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -158,7 +158,7 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
     }
 
     private void handleIntakePartitionStarts(ActiveEvent message, ActiveJob jobInfo) {
-        if (feedPipeline.get(message.getFeedId()).first.decrementAndGet() == 0) {
+        if (feedPipeline.get(message.getEntityId()).first.decrementAndGet() == 0) {
             ((FeedIntakeInfo) jobInfo).getIntakeFeedJoint().setState(State.ACTIVE);
             jobInfo.setState(ActivityState.ACTIVE);
             notifyFeedEventSubscribers(FeedLifecycleEvent.FEED_INTAKE_STARTED);
@@ -339,7 +339,7 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
         return locations;
     }
 
-    private void notifyFeedEventSubscribers(FeedLifecycleEvent event) {
+    private synchronized void notifyFeedEventSubscribers(FeedLifecycleEvent event) {
         if (subscribers != null && !subscribers.isEmpty()) {
             for (IFeedLifecycleEventSubscriber subscriber : subscribers) {
                 subscriber.handleFeedEvent(event);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
index 9c7a319..6c04b2d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
@@ -19,8 +19,8 @@
 package org.apache.asterix.external.feed.runtime;
 
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.log4j.Logger;
 
 /**
@@ -30,13 +30,12 @@ public class AdapterExecutor implements Runnable {
 
     private static final Logger LOGGER = Logger.getLogger(AdapterExecutor.class.getName());
 
-    private final DistributeFeedFrameWriter writer;     // A writer that sends frames to multiple receivers (that can
+    private final IFrameWriter writer; // A writer that sends frames to multiple receivers (that can
     // increase or decrease at any time)
-    private final FeedAdapter adapter;                 // The adapter
+    private final FeedAdapter adapter; // The adapter
     private final AdapterRuntimeManager adapterManager;// The runtime manager <-- two way visibility -->
 
-    public AdapterExecutor(int partition, DistributeFeedFrameWriter writer, FeedAdapter adapter,
-            AdapterRuntimeManager adapterManager) {
+    public AdapterExecutor(IFrameWriter writer, FeedAdapter adapter, AdapterRuntimeManager adapterManager) {
         this.writer = writer;
         this.adapter = adapter;
         this.adapterManager = adapterManager;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index f2f0747..424f2dc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.log4j.Logger;
 
 /**
@@ -34,27 +34,26 @@ public class AdapterRuntimeManager {
 
     private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName());
 
-    private final EntityId feedId;                    // (dataverse-feed)
+    private final EntityId feedId; // (dataverse-feed)
 
-    private final FeedAdapter feedAdapter;         // The adapter
+    private final FeedAdapter feedAdapter; // The adapter
 
-    private final AdapterExecutor adapterExecutor;  // The executor for the adapter
+    private final AdapterExecutor adapterExecutor; // The executor for the adapter
 
-    private final int partition;                    // The partition number
+    private final int partition; // The partition number
 
-    private final ExecutorService executorService;  // Executor service to run/shutdown the adapter executor
+    private final ExecutorService executorService; // Executor service to run/shutdown the adapter executor
 
-    private IngestionRuntime ingestionRuntime;      // Runtime representing the ingestion stage of a feed
+    private IngestionRuntime ingestionRuntime; // Runtime representing the ingestion stage of a feed
 
     private volatile boolean done = false;
     private volatile boolean failed = false;
 
-    public AdapterRuntimeManager(EntityId feedId, FeedAdapter feedAdapter, DistributeFeedFrameWriter writer,
-            int partition) {
-        this.feedId = feedId;
+    public AdapterRuntimeManager(EntityId entityId, FeedAdapter feedAdapter, IFrameWriter writer, int partition) {
+        this.feedId = entityId;
         this.feedAdapter = feedAdapter;
         this.partition = partition;
-        this.adapterExecutor = new AdapterExecutor(partition, writer, feedAdapter, this);
+        this.adapterExecutor = new AdapterExecutor(writer, feedAdapter, this);
         this.executorService = Executors.newSingleThreadExecutor();
     }
 
@@ -82,7 +81,6 @@ public class AdapterRuntimeManager {
                 // stop() returned false, we try to force shutdown
                 executorService.shutdownNow();
             }
-
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
index 294642e..821a0b1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
@@ -20,13 +20,13 @@ package org.apache.asterix.external.feed.runtime;
 
 import java.util.Map;
 
-import org.apache.asterix.active.ActiveRuntime;
 import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.IActiveRuntime;
 import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Represents the feed runtime that collects feed tuples from another feed.
@@ -34,18 +34,19 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
  * intake job. For a secondary feed, tuples are collected from the intake/compute
  * runtime associated with the source feed.
  */
-public class CollectionRuntime extends ActiveRuntime implements ISubscriberRuntime {
+public class CollectionRuntime implements IActiveRuntime {
 
-    private final FeedConnectionId connectionId; // [Dataverse - Feed - Dataset]
-    private final ISubscribableRuntime sourceRuntime; // Runtime that provides the data
-    private final Map<String, String> feedPolicy; // Policy associated with the feed
-    private final FeedFrameCollector frameCollector; // Collector that can be plugged into a frame distributor
+    private final FeedConnectionId connectionId;        // [Dataverse - Feed - Dataset]
+    private final ISubscribableRuntime sourceRuntime;   // Runtime that provides the data
+    private final Map<String, String> feedPolicy;       // Policy associated with the feed
+    private final FeedFrameCollector frameCollector;    // Collector that can be plugged into a frame distributor
     private final IHyracksTaskContext ctx;
+    private final ActiveRuntimeId runtimeId;
 
     public CollectionRuntime(FeedConnectionId connectionId, ActiveRuntimeId runtimeId,
             ISubscribableRuntime sourceRuntime, Map<String, String> feedPolicy, IHyracksTaskContext ctx,
             FeedFrameCollector frameCollector) {
-        super(runtimeId);
+        this.runtimeId = runtimeId;
         this.connectionId = connectionId;
         this.sourceRuntime = sourceRuntime;
         this.feedPolicy = feedPolicy;
@@ -68,7 +69,6 @@ public class CollectionRuntime extends ActiveRuntime implements ISubscriberRunti
                 || frameCollector.getState().equals(FeedFrameCollector.State.HANDOVER);
     }
 
-    @Override
     public Map<String, String> getFeedPolicy() {
         return feedPolicy;
     }
@@ -88,4 +88,13 @@ public class CollectionRuntime extends ActiveRuntime implements ISubscriberRunti
     public IHyracksTaskContext getCtx() {
         return ctx;
     }
+
+    @Override
+    public ActiveRuntimeId getRuntimeId() {
+        return runtimeId;
+    }
+
+    @Override
+    public void stop() throws HyracksDataException, InterruptedException {
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 9661890..34237c4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -22,7 +22,7 @@ import java.util.logging.Level;
 
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.api.ISubscriberRuntime;
+import org.apache.asterix.active.IActiveRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -37,9 +37,9 @@ public class IngestionRuntime extends SubscribableRuntime {
     private final IHyracksTaskContext ctx;
     private int numSubscribers = 0;
 
-    public IngestionRuntime(EntityId feedId, ActiveRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
+    public IngestionRuntime(EntityId entityId, ActiveRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
             AdapterRuntimeManager adaptorRuntimeManager, IHyracksTaskContext ctx) {
-        super(feedId, runtimeId, feedWriter);
+        super(entityId, runtimeId, feedWriter);
         this.adapterRuntimeManager = adaptorRuntimeManager;
         this.ctx = ctx;
     }
@@ -53,7 +53,7 @@ public class IngestionRuntime extends SubscribableRuntime {
             TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
             TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE,
                     TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx), collectionRuntime.getCtx());
-            adapterRuntimeManager.start();
+            start();
         }
         numSubscribers++;
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -65,7 +65,7 @@ public class IngestionRuntime extends SubscribableRuntime {
     public synchronized void unsubscribe(CollectionRuntime collectionRuntime) throws InterruptedException {
         numSubscribers--;
         if (numSubscribers == 0) {
-            adapterRuntimeManager.stop();
+            stop();
         }
         subscribers.remove(collectionRuntime);
     }
@@ -75,7 +75,7 @@ public class IngestionRuntime extends SubscribableRuntime {
     }
 
     public void terminate() {
-        for (ISubscriberRuntime subscriber : subscribers) {
+        for (IActiveRuntime subscriber : subscribers) {
             try {
                 unsubscribe((CollectionRuntime) subscriber);
             } catch (Exception e) {
@@ -86,4 +86,12 @@ public class IngestionRuntime extends SubscribableRuntime {
         }
     }
 
+    public void start() {
+        adapterRuntimeManager.start();
+    }
+
+    @Override
+    public void stop() throws InterruptedException {
+        adapterRuntimeManager.stop();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
index 423e599..fb70fdb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
@@ -22,25 +22,30 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveRuntime;
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveRuntime;
 import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 
-public abstract class SubscribableRuntime extends ActiveRuntime implements ISubscribableRuntime {
+public abstract class SubscribableRuntime implements ISubscribableRuntime {
 
     protected static final Logger LOGGER = Logger.getLogger(SubscribableRuntime.class.getName());
     protected final EntityId feedId;
-    protected final List<ISubscriberRuntime> subscribers;
+    protected final List<IActiveRuntime> subscribers;
     protected final DistributeFeedFrameWriter dWriter;
+    protected final ActiveRuntimeId runtimeId;
 
     public SubscribableRuntime(EntityId feedId, ActiveRuntimeId runtimeId, DistributeFeedFrameWriter dWriter) {
-        super(runtimeId);
+        this.runtimeId = runtimeId;
         this.feedId = feedId;
         this.dWriter = dWriter;
-        this.subscribers = new ArrayList<ISubscriberRuntime>();
+        this.subscribers = new ArrayList<>();
+    }
+
+    @Override
+    public ActiveRuntimeId getRuntimeId() {
+        return runtimeId;
     }
 
     public EntityId getFeedId() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
index 533d119..7acb1f8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import java.util.Date;
 
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.commons.lang3.ObjectUtils;
 
 public class ExternalFile implements Serializable, Comparable<ExternalFile> {
 
@@ -118,21 +119,21 @@ public class ExternalFile implements Serializable, Comparable<ExternalFile> {
     }
 
     @Override
+    public int hashCode() {
+        return ObjectUtils.hashCodeMulti(dataverseName, datasetName, fileName);
+    }
+
+    @Override
     public boolean equals(Object obj) {
-        if (obj == null)
-            return false;
-        if (obj == this)
+        if (obj == this) {
             return true;
-        if (!(obj instanceof ExternalFile))
+        }
+        if (!(obj instanceof ExternalFile)) {
             return false;
+        }
         ExternalFile anotherFile = (ExternalFile) obj;
-        if (fileNumber != anotherFile.fileNumber)
-            return false;
-        if (!dataverseName.equals(anotherFile.dataverseName))
-            return false;
-        if (!datasetName.equals(anotherFile.datasetName))
-            return false;
-        return true;
+        return fileNumber == anotherFile.fileNumber && ObjectUtils.equals(dataverseName, anotherFile.getDataverseName())
+                && ObjectUtils.equals(datasetName, anotherFile.getDatasetName());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index 84c2cb4..266669b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -78,9 +78,9 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
             IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
             throws HyracksDataException {
         ActiveManager feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject()).getFeedManager();
+                .getApplicationContext().getApplicationObject()).getActiveManager();
         ActiveRuntimeId sourceRuntimeId = new ActiveRuntimeId(sourceFeedId, subscriptionLocation.toString(), partition);
-        ISubscribableRuntime sourceRuntime = (ISubscribableRuntime) feedManager.getSubscribableRuntime(sourceRuntimeId);
+        ISubscribableRuntime sourceRuntime = (ISubscribableRuntime) feedManager.getRuntime(sourceRuntimeId);
         return new FeedCollectOperatorNodePushable(ctx, connectionId, feedPolicyProperties, partition, sourceRuntime);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 231fe99..ee9a186 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -21,8 +21,8 @@ package org.apache.asterix.external.operators;
 import java.util.Map;
 
 import org.apache.asterix.active.ActiveManager;
-import org.apache.asterix.active.ActivePartitionMessage;
 import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.external.feed.api.ISubscribableRuntime;
 import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
@@ -46,7 +46,7 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
     private final FeedConnectionId connectionId;
     private final Map<String, String> feedPolicy;
     private final FeedPolicyAccessor policyAccessor;
-    private final ActiveManager feedManager;
+    private final ActiveManager activeManager;
     private final ISubscribableRuntime sourceRuntime;
     private final IHyracksTaskContext ctx;
     private CollectionRuntime collectRuntime;
@@ -59,8 +59,8 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
         this.sourceRuntime = sourceRuntime;
         this.feedPolicy = feedPolicy;
         this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
-        this.feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
-                .getApplicationObject()).getFeedManager();
+        this.activeManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+                .getApplicationObject()).getActiveManager();
     }
 
     @Override
@@ -72,20 +72,19 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
             FrameTupleAccessor tAccessor = new FrameTupleAccessor(recordDesc);
             if (policyAccessor.bufferingEnabled()) {
                 writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, policyAccessor, tAccessor,
-                        feedManager.getFramePool());
+                        activeManager.getFramePool());
             } else {
                 writer = new SyncFeedRuntimeInputHandler(ctx, writer, tAccessor);
             }
             collectRuntime = new CollectionRuntime(connectionId, runtimeId, sourceRuntime, feedPolicy, ctx,
                     new FeedFrameCollector(policyAccessor, writer, connectionId));
-            feedManager.getActiveRuntimeRegistry().registerRuntime(collectRuntime);
+            activeManager.registerRuntime(collectRuntime);
             sourceRuntime.subscribe(collectRuntime);
             // Notify CC that Collection started
-            ctx.sendApplicationMessageToCC(
-                    new ActivePartitionMessage(connectionId.getFeedId(), ctx.getJobletContext().getJobId(), null),
-                    null);
+            ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
+                    ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null);
             collectRuntime.waitTillCollectionOver();
-            feedManager.getActiveRuntimeRegistry().deregisterRuntime(collectRuntime.getRuntimeId());
+            activeManager.deregisterRuntime(collectRuntime.getRuntimeId());
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index ffa451b..7c8fe14 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -19,9 +19,9 @@
 package org.apache.asterix.external.operators;
 
 import org.apache.asterix.active.ActiveManager;
-import org.apache.asterix.active.ActivePartitionMessage;
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
@@ -62,7 +62,7 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
     @Override
     public void initialize() throws HyracksDataException {
         ActiveManager feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject()).getFeedManager();
+                .getApplicationContext().getApplicationObject()).getActiveManager();
         AdapterRuntimeManager adapterRuntimeManager = null;
         DistributeFeedFrameWriter frameDistributor = null;
         IngestionRuntime ingestionRuntime = null;
@@ -80,8 +80,8 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
             ingestionRuntime = new IngestionRuntime(feedId, runtimeId, frameDistributor, adapterRuntimeManager, ctx);
             feedManager.registerRuntime(ingestionRuntime);
             // Notify FeedJobNotificationHandler that this provider is ready to receive subscription requests.
-            ctx.sendApplicationMessageToCC(new ActivePartitionMessage(feedId, ctx.getJobletContext().getJobId(), null),
-                    null);
+            ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
+                    ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null);
             // open the distributor
             open = true;
             frameDistributor.open();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
index 5f92327..1dce6be 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
@@ -63,7 +63,7 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
         this.partition = partition;
         IAsterixAppRuntimeContext runtimeCtx =
                 (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
-        this.feedManager = (ActiveManager) runtimeCtx.getFeedManager();
+        this.feedManager = (ActiveManager) runtimeCtx.getActiveManager();
     }
 
     @Override
@@ -98,8 +98,7 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
         EntityId sourceFeedId = endFeedMessage.getSourceFeedId();
         ActiveRuntimeId subscribableRuntimeId =
                 new ActiveRuntimeId(sourceFeedId, FeedRuntimeType.INTAKE.toString(), partition);
-        ISubscribableRuntime feedRuntime =
-                (ISubscribableRuntime) feedManager.getSubscribableRuntime(subscribableRuntimeId);
+        ISubscribableRuntime feedRuntime = (ISubscribableRuntime) feedManager.getRuntime(subscribableRuntimeId);
         AdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager();
         adapterRuntimeManager.stop();
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -129,8 +128,7 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
             }
 
             runtimeId = new ActiveRuntimeId(endFeedMessage.getSourceFeedId(), runtimeType.toString(), partition);
-            CollectionRuntime feedRuntime =
-                    (CollectionRuntime) feedManager.getActiveRuntimeRegistry().getRuntime(runtimeId);
+            CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getRuntime(runtimeId);
             if (feedRuntime != null) {
                 feedRuntime.getSourceRuntime().unsubscribe(feedRuntime);
             }
@@ -150,11 +148,10 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
                     ActiveRuntimeId feedSubscribableRuntimeId = new ActiveRuntimeId(connectionId.getFeedId(),
                             FeedRuntimeType.COMPUTE.toString(), partition);
                     ISubscribableRuntime feedRuntime =
-                            (ISubscribableRuntime) feedManager.getSubscribableRuntime(feedSubscribableRuntimeId);
+                            (ISubscribableRuntime) feedManager.getRuntime(feedSubscribableRuntimeId);
                     runtimeId = new ActiveRuntimeId(endFeedMessage.getSourceFeedId(),
                             FeedRuntimeType.COMPUTE_COLLECT.toString(), partition);
-                    CollectionRuntime feedCollectionRuntime =
-                            (CollectionRuntime) feedManager.getActiveRuntimeRegistry().getRuntime(runtimeId);
+                    CollectionRuntime feedCollectionRuntime = (CollectionRuntime) feedManager.getRuntime(runtimeId);
                     feedRuntime.unsubscribe(feedCollectionRuntime);
                     break;
                 default:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 37a42a7..ff996aa 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -24,7 +24,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveManager;
-import org.apache.asterix.active.ActiveRuntime;
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
@@ -62,12 +61,6 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
     private FeedPolicyEnforcer policyEnforcer;
 
     /**
-     * The Feed Runtime instance associated with the operator. Feed Runtime
-     * captures the state of the operator while the feed is active.
-     */
-    private ActiveRuntime feedRuntime;
-
-    /**
      * A unique identifier for the feed instance. A feed instance represents
      * the flow of data from a feed to a dataset.
      **/
@@ -112,7 +105,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
         this.partition = partition;
         this.connectionId = feedConnectionId;
         this.feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
-                .getApplicationObject()).getFeedManager();
+                .getApplicationObject()).getActiveManager();
         this.message = new VSizeFrame(ctx);
         TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
         this.opDesc = feedMetaOperatorDescriptor;
@@ -142,8 +135,6 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
         } else {
             writer = new SyncFeedRuntimeInputHandler(ctx, coreOperator, fta);
         }
-        feedRuntime = new ActiveRuntime(runtimeId);
-        feedManager.getActiveRuntimeRegistry().registerRuntime(feedRuntime);
     }
 
     @Override
@@ -164,19 +155,11 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
 
     @Override
     public void close() throws HyracksDataException {
-        try {
-            deregister();
-        } finally {
-            if (opened) {
-                writer.close();
-            }
+        if (opened) {
+            writer.close();
         }
     }
 
-    private void deregister() {
-        feedManager.getActiveRuntimeRegistry().deregisterRuntime(feedRuntime.getRuntimeId());
-    }
-
     @Override
     public void flush() throws HyracksDataException {
         writer.flush();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 95bebad..2c3b62f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -24,7 +24,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveManager;
-import org.apache.asterix.active.ActiveRuntime;
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
@@ -34,7 +33,6 @@ import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
@@ -60,12 +58,6 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
     private final FeedPolicyEnforcer policyEnforcer;
 
     /**
-     * The Feed Runtime instance associated with the operator. Feed Runtime
-     * captures the state of the operator while the feed is active.
-     */
-    private ActiveRuntime feedRuntime;
-
-    /**
      * A unique identifier for the feed instance. A feed instance represents
      * the flow of data from a feed to a dataset.
      **/
@@ -106,7 +98,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
         this.partition = partition;
         this.connectionId = feedConnectionId;
         this.feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
-                .getApplicationObject()).getFeedManager();
+                .getApplicationObject()).getActiveManager();
         this.targetId = targetId;
         this.message = new VSizeFrame(ctx);
         TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
@@ -116,8 +108,8 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
 
     @Override
     public void open() throws HyracksDataException {
-        ActiveRuntimeId runtimeId =
-                new ActiveRuntimeId(connectionId.getFeedId(), runtimeType.toString() + "." + targetId, partition);
+        ActiveRuntimeId runtimeId = new ActiveRuntimeId(connectionId.getFeedId(),
+                runtimeType.toString() + "." + targetId, partition);
         try {
             initializeNewFeedRuntime(runtimeId);
             insertOperator.open();
@@ -135,7 +127,6 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
                     (AsterixLSMInsertDeleteOperatorNodePushable) insertOperator;
             if (!indexOp.isPrimary()) {
                 writer = insertOperator;
-                setupBasicRuntime(writer);
                 return;
             }
         }
@@ -145,14 +136,6 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
         } else {
             writer = new SyncFeedRuntimeInputHandler(ctx, insertOperator, fta);
         }
-        setupBasicRuntime(writer);
-    }
-
-    private void setupBasicRuntime(IFrameWriter frameWriter) throws Exception {
-        ActiveRuntimeId runtimeId =
-                new ActiveRuntimeId(connectionId.getFeedId(), runtimeType.toString() + "." + targetId, partition);
-        feedRuntime = new ActiveRuntime(runtimeId);
-        feedManager.getActiveRuntimeRegistry().registerRuntime(feedRuntime);
     }
 
     @Override
@@ -173,15 +156,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
 
     @Override
     public void close() throws HyracksDataException {
-        try {
-            writer.close();
-        } finally {
-            deregister();
-        }
-    }
-
-    private void deregister() {
-        feedManager.getActiveRuntimeRegistry().deregisterRuntime(feedRuntime.getRuntimeId());
+        writer.close();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/rewrites/AqlQueryRewriter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/rewrites/AqlQueryRewriter.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/rewrites/AqlQueryRewriter.java
index 65f61f2..5edc521 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/rewrites/AqlQueryRewriter.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/rewrites/AqlQueryRewriter.java
@@ -112,8 +112,8 @@ class AqlQueryRewriter implements IQueryRewriter {
         buildOtherUdfs(topExpr.getBody(), otherFDecls, funIds);
         declaredFunctions.addAll(otherFDecls);
         if (!declaredFunctions.isEmpty()) {
-            AQLInlineUdfsVisitor visitor = new AQLInlineUdfsVisitor(context, new AQLRewriterFactory(),
-                    declaredFunctions, metadataProvider);
+            AQLInlineUdfsVisitor visitor =
+                    new AQLInlineUdfsVisitor(context, new AQLRewriterFactory(), declaredFunctions, metadataProvider);
             while (topExpr.accept(visitor, declaredFunctions)) {
                 // loop until no more changes
             }
@@ -154,9 +154,9 @@ class AqlQueryRewriter implements IQueryRewriter {
                 FunctionDecl functionDecl = functionParser.getFunctionDecl(function);
                 if (functionDecl != null) {
                     if (functionDecls.contains(functionDecl)) {
-                        throw new AsterixException("Recursive invocation "
-                                + functionDecls.get(functionDecls.size() - 1).getSignature() + " <==> "
-                                + functionDecl.getSignature());
+                        throw new AsterixException(
+                                "Recursive invocation " + functionDecls.get(functionDecls.size() - 1).getSignature()
+                                        + " <==> " + functionDecl.getSignature());
                     }
                     functionDecls.add(functionDecl);
                     buildOtherUdfs(functionDecl.getFuncBody(), functionDecls, declaredFunctions);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index 836de6a..d4bf6bc 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -204,4 +204,9 @@ public class SubscribeFeedStatement implements Statement {
     public String[] getLocations() {
         return locations;
     }
+
+    @Override
+    public byte getCategory() {
+        return Category.PROCEDURE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index 9cffda0..f46f13d 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -43,7 +43,6 @@ import org.apache.asterix.common.annotations.TypeDataGen;
 import org.apache.asterix.common.annotations.UndeclaredFieldsDataGen;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.config.MetadataConstants;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.aql.clause.DistinctClause;
@@ -106,7 +105,7 @@ import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
 import org.apache.asterix.lang.common.statement.DeleteStatement;
 import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
-import org.apache.asterix.lang.common.statement.DropStatement;
+import org.apache.asterix.lang.common.statement.DropDatasetStatement;
 import org.apache.asterix.lang.common.statement.ExternalDetailsDecl;
 import org.apache.asterix.lang.common.statement.FeedDropStatement;
 import org.apache.asterix.lang.common.statement.FeedPolicyDropStatement;
@@ -130,6 +129,7 @@ import org.apache.asterix.lang.common.statement.WriteStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.struct.QuantifiedPair;
 import org.apache.asterix.lang.common.struct.VarIdentifier;
+import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
@@ -872,7 +872,7 @@ Statement DropStatement() throws ParseException:
   (
     <DATASET> pairId = QualifiedName() ifExists = IfExists()
       {
-        stmt = new DropStatement(pairId.first, pairId.second, ifExists);
+        stmt = new DropDatasetStatement(pairId.first, pairId.second, ifExists);
       }
     | <INDEX> tripleId = DoubleQualifiedName() ifExists = IfExists()
       {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Expression.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Expression.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Expression.java
index b1df329..7da34fe 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Expression.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Expression.java
@@ -38,7 +38,6 @@ public interface Expression extends ILangExpression {
         UNION_EXPRESSION,
         SELECT_EXPRESSION,
         PRIMARY_EXPRESSION,
-        VALUE_EXPRESSION,
         INDEPENDENT_SUBQUERY,
         CASE_EXPRESSION
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
index fcc1080..b12195f 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
@@ -34,6 +34,28 @@ public interface Statement extends ILangExpression {
      */
     public byte getKind();
 
+    /**
+     *  get a byte representing the statement category.
+     *  Each category describes the type of modifications this statement does.
+     *
+     * @return kind byte
+     */
+    public byte getCategory();
+
+    public class Category {
+        /** no modifications */
+        public static final byte QUERY = 0x01;
+        /** modify data */
+        public static final byte UPDATE = 0x02;
+        /** modify metadata */
+        public static final byte DDL = 0x04;
+        /** modify anything */
+        public static final byte PROCEDURE = 0x08;
+
+        private Category() {
+        }
+    }
+
     public class Kind {
         public static final byte DATASET_DECL = 0x00;
         public static final byte DATAVERSE_DECL = 0x01;
@@ -70,6 +92,7 @@ public interface Statement extends ILangExpression {
         public static final byte COMPACT = 0x20;
         public static final byte EXTERNAL_DATASET_REFRESH = 0x21;
         public static final byte RUN = 0x22;
+        public static final byte EXTENSION = 0x23;
 
         private Kind() {
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java
index 912ac6f..94aa1ac 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java
@@ -50,4 +50,9 @@ public class CompactStatement implements Statement {
     public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
         return visitor.visit(this, arg);
     }
+
+    @Override
+    public byte getCategory() {
+        return Category.UPDATE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
index 48bc59e..fd1ca5e 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
@@ -102,4 +102,9 @@ public class ConnectFeedStatement implements Statement {
         return feedName;
     }
 
+    @Override
+    public byte getCategory() {
+        return Category.UPDATE;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
index c6dff3f..a475051 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
@@ -57,4 +57,9 @@ public class CreateDataverseStatement implements Statement {
     public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
         return visitor.visit(this, arg);
     }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java
index dfc6c7e..75fc556 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java
@@ -87,4 +87,9 @@ public class CreateFeedPolicyStatement implements Statement {
         return description;
     }
 
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
index c4f9efb..54b6a94 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
@@ -57,4 +57,8 @@ public abstract class CreateFeedStatement implements Statement {
     @Override
     public abstract <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException;
 
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
index 02a2b0c..5a75f43 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
@@ -75,4 +75,9 @@ public class CreateFunctionStatement implements Statement {
         return visitor.visit(this, arg);
     }
 
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateIndexStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateIndexStatement.java
index fd787ab..7f36c45 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateIndexStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateIndexStatement.java
@@ -139,4 +139,9 @@ public class CreateIndexStatement implements Statement {
         return visitor.visit(this, arg);
     }
 
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
index b2f3fe1..bf27988 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
@@ -21,11 +21,11 @@ package org.apache.asterix.lang.common.statement;
 import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.MetadataConstants;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.utils.MetadataConstants;
 
 public class DatasetDecl implements Statement {
     protected final Identifier name;
@@ -153,4 +153,9 @@ public class DatasetDecl implements Statement {
         return dataverse;
     }
 
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
 }


Mime
View raw message