asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [22/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
Date Thu, 14 Jan 2016 20:32:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
index 44af0ff..d67ca0d 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
@@ -38,6 +38,7 @@ import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.external.operators.ExternalIndexBulkModifyOperatorDescriptor;
 import org.apache.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
 import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
@@ -46,7 +47,6 @@ import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
index 4d887dc..418d143 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
@@ -31,12 +31,12 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
 import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
 import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
index c6e04df..4fae7e9 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
@@ -26,15 +26,15 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.api.IClusterEventsSubscriber;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.api.IClusterManagementWorkResponse;
+import org.apache.asterix.common.config.MetadataConstants;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.feeds.CentralFeedManager;
+import org.apache.asterix.feed.CentralFeedManager;
 import org.apache.asterix.file.ExternalIndexingOperations;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index d2164f4..2a7b3e4 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -37,12 +37,12 @@ import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
-import org.apache.asterix.common.feeds.api.ICentralFeedManager;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.event.service.ILookupService;
-import org.apache.asterix.feeds.CentralFeedManager;
-import org.apache.asterix.feeds.FeedLifecycleListener;
+import org.apache.asterix.external.feed.api.ICentralFeedManager;
+import org.apache.asterix.feed.CentralFeedManager;
+import org.apache.asterix.feed.FeedLifecycleListener;
 import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
index 01775ab..b0dfd58 100755
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
@@ -36,6 +36,8 @@ import javax.xml.bind.Unmarshaller;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
 import org.apache.asterix.external.library.ExternalLibrary;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.external.library.LibraryAdapter;
@@ -44,10 +46,8 @@ import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.api.IMetadataEntity;
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Library;
-import org.apache.asterix.metadata.feeds.AdapterIdentifier;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 
 public class ExternalLibraryBootstrap {
@@ -210,7 +210,8 @@ public class ExternalLibraryBootstrap {
                     String adapterFactoryClass = adapter.getFactoryClass().trim();
                     String adapterName = libraryName + "#" + adapter.getName().trim();
                     AdapterIdentifier aid = new AdapterIdentifier(dataverse, adapterName);
-                    DatasourceAdapter dsa = new DatasourceAdapter(aid, adapterFactoryClass, AdapterType.EXTERNAL);
+                    DatasourceAdapter dsa = new DatasourceAdapter(aid, adapterFactoryClass,
+                            IDataSourceAdapter.AdapterType.EXTERNAL);
                     MetadataManager.INSTANCE.addAdapter(mdTxnCtx, dsa);
                     if (LOGGER.isLoggable(Level.INFO)) {
                         LOGGER.info("Installed adapter: " + adapterName);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
index 2d443c7..d5f1a51 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
@@ -18,29 +18,23 @@
  */
 package org.apache.asterix.hyracks.bootstrap;
 
-import org.apache.asterix.feeds.CentralFeedManager;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
+import org.apache.asterix.common.config.MetadataConstants;
+import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.feed.CentralFeedManager;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 
 public class FeedBootstrap {
 
-    public final static String FEEDS_METADATA_DV = "feeds_metadata";
-    public final static String FAILED_TUPLE_DATASET = "failed_tuple";
-    public final static String FAILED_TUPLE_DATASET_TYPE = "FailedTupleType";
-    public final static String FAILED_TUPLE_DATASET_KEY = "id";
-
     public static void setUpInitialArtifacts() throws Exception {
 
         StringBuilder builder = new StringBuilder();
         try {
-            builder.append("create dataverse " + FEEDS_METADATA_DV + ";" + "\n");
-            builder.append("use dataverse " + FEEDS_METADATA_DV + ";" + "\n");
-
-            builder.append("create type " + FAILED_TUPLE_DATASET_TYPE + " as open { ");
-
-            String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple",
-                    "message", "timestamp" };
+            builder.append("create dataverse " + FeedConstants.FEEDS_METADATA_DV + ";" + "\n");
+            builder.append("use dataverse " + FeedConstants.FEEDS_METADATA_DV + ";" + "\n");
+            builder.append("create type " + FeedConstants.FAILED_TUPLE_DATASET_TYPE + " as open { ");
+            String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple", "message",
+                    "timestamp" };
             IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
                     BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
 
@@ -52,9 +46,9 @@ public class FeedBootstrap {
                 builder.append(fieldTypes[i].getTypeName());
             }
             builder.append("}" + ";" + "\n");
-
-            builder.append("create dataset " + FAILED_TUPLE_DATASET + " " + "(" + FAILED_TUPLE_DATASET_TYPE + ")" + " "
-                    + "primary key " + FAILED_TUPLE_DATASET_KEY + " on  " + MetadataConstants.METADATA_NODEGROUP_NAME
+            builder.append("create dataset " + FeedConstants.FAILED_TUPLE_DATASET + " " + "("
+                    + FeedConstants.FAILED_TUPLE_DATASET_TYPE + ")" + " " + "primary key "
+                    + FeedConstants.FAILED_TUPLE_DATASET_KEY + " on  " + MetadataConstants.METADATA_NODEGROUP_NAME
                     + ";");
 
             CentralFeedManager.AQLExecutor.executeAQL(builder.toString());

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 76e7856..3341387 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -38,6 +38,7 @@ import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.messaging.NCMessageBroker;
@@ -46,7 +47,6 @@ import org.apache.asterix.metadata.MetadataNode;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
@@ -114,7 +114,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         runtimeContext.initialize(initialRun);
         ncApplicationContext.setApplicationObject(runtimeContext);
 
-        //if replication is enabled, check if there is a replica for this node
+        //If replication is enabled, check if there is a replica for this node
         AsterixReplicationProperties asterixReplicationProperties = ((IAsterixPropertiesProvider) runtimeContext)
                 .getReplicationProperties();
 
@@ -123,7 +123,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         if (initialRun) {
             LOGGER.info("System is being initialized. (first run)");
         } else {
-            // #. recover if the system is corrupted by checking system state.
+            //#. recover if the system is corrupted by checking system state.
             IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
             systemState = recoveryMgr.getSystemState();
 
@@ -133,7 +133,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
 
             if (replicationEnabled) {
                 if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) {
-                    //try to perform remote recovery
+                    //Try to perform remote recovery
                     IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager();
                     remoteRecoveryMgr.performRemoteRecovery();
                     performedRemoteRecovery = true;
@@ -152,20 +152,20 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
     }
 
     private void startReplicationService() throws IOException {
-        //open replication channel
+        //Open replication channel
         runtimeContext.getReplicationChannel().start();
 
-        //check the state of remote replicas
+        //Check the state of remote replicas
         runtimeContext.getReplicationManager().initializeReplicasState();
 
         if (performedRemoteRecovery) {
-            //notify remote replicas about the new IP Address if changed
+            //Notify remote replicas about the new IP Address if changed
             //Note: this is a hack since each node right now maintains its own copy of the cluster configuration.
             //Once the configuration is centralized on the CC, this step wont be needed.
             runtimeContext.getReplicationManager().broadcastNewIPAddress();
         }
 
-        //start replication after the state of remote replicas has been initialized. 
+        //Start replication after the state of remote replicas has been initialized.
         runtimeContext.getReplicationManager().startReplicationThreads();
     }
 
@@ -182,7 +182,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
                 MetadataBootstrap.stopUniverse();
             }
 
-            //clean any temporary files
+            //Clean any temporary files
             performLocalCleanUp();
 
             //Note: stopping recovery manager will make a sharp checkpoint
@@ -197,7 +197,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
 
     @Override
     public void notifyStartupComplete() throws Exception {
-        //send max resource id on this NC to the CC
+        //Send max resource id on this NC to the CC
         ((INCMessageBroker) ncApplicationContext.getMessageBroker()).reportMaxResourceId();
 
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
@@ -228,9 +228,9 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
                 throw new IllegalStateException("Metadata node cannot access distributed state");
             }
 
-            // This is a special case, we just give the metadataNode directly.
-            // This way we can delay the registration of the metadataNode until
-            // it is completely initialized.
+            //This is a special case, we just give the metadataNode directly.
+            //This way we can delay the registration of the metadataNode until
+            //it is completely initialized.
             MetadataManager.INSTANCE = new MetadataManager(proxy, MetadataNode.INSTANCE);
             MetadataBootstrap.startUniverse(((IAsterixPropertiesProvider) runtimeContext), ncApplicationContext,
                     systemState == SystemState.NEW_UNIVERSE);
@@ -272,26 +272,26 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
             proxy.setMetadataNode(stub);
         }
 
-        //clean any temporary files
+        //Clean any temporary files
         performLocalCleanUp();
     }
 
     private void performLocalCleanUp() {
-        //delete working area files from failed jobs
+        //Delete working area files from failed jobs
         runtimeContext.getIOManager().deleteWorkspaceFiles();
 
-        //reclaim storage for temporary datasets.
+        //Reclaim storage for temporary datasets.
         String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
         String[] ioDevices = ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
                 .getStorageMountingPoints();
         for (String ioDevice : ioDevices) {
             String tempDatasetsDir = ioDevice + storageDirName + File.separator
-                    + SplitsAndConstraintsUtil.TEMP_DATASETS_STORAGE_FOLDER;
+                    + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
             FileUtils.deleteQuietly(new File(tempDatasetsDir));
         }
 
-        // TODO
-        //reclaim storage for orphaned index artifacts in NCs.
+        //TODO
+        //Reclaim storage for orphaned index artifacts in NCs.
         //Note: currently LSM indexes invalid components are deleted when an index is activated.
     }
 
@@ -321,7 +321,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
                     String nodeIoDevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
                     String[] ioDevicePaths = nodeIoDevices.trim().split(",");
                     for (int i = 0; i < ioDevicePaths.length; i++) {
-                        //construct full store path
+                        // construct full store path
                         ioDevicePaths[i] += File.separator + storeDir;
                     }
                     metadataProperties.getStores().put(nodeId, ioDevicePaths);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java b/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
index 8ff6d9b..83beb7c 100644
--- a/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
@@ -36,7 +36,7 @@ import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.asterix.feeds.CentralFeedManager;
+import org.apache.asterix.feed.CentralFeedManager;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm b/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm
index c4dde05..aabf05d 100644
--- a/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm
+++ b/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm
@@ -1,3 +1 @@
-{ "DataverseName": "Metadata", "Name": "adapter", "Classname": "org.apache.asterix.external.adapter.factory.GenericAdapterFactory", "Type": "INTERNAL", "Timestamp": "Sun Jan 03 15:39:35 AST 2016" }
-{ "DataverseName": "Metadata", "Name": "socket_adapter", "Classname": "org.apache.asterix.external.runtime.GenericSocketFeedAdapterFactory", "Type": "INTERNAL", "Timestamp": "Sun Jan 03 15:39:35 AST 2016" }
-{ "DataverseName": "Metadata", "Name": "socket_client", "Classname": "org.apache.asterix.external.runtime.SocketClientAdapterFactory", "Type": "INTERNAL", "Timestamp": "Sun Jan 03 15:39:35 AST 2016" }
\ No newline at end of file
+{ "DataverseName": "Metadata", "Name": "adapter", "Classname": "org.apache.asterix.external.adapter.factory.GenericAdapterFactory", "Type": "INTERNAL", "Timestamp": "Sun Jan 10 16:13:18 AST 2016" }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
index f981aca..f3e4605 100644
--- a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "Function": null, "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }} }, "SecondaryTypeDetails": null, "Timestamp": "Sat Jun 20 13:55:58 PDT 2015" }
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "Function": null, "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }} }, "SecondaryTypeDetails": null, "Timestamp": "Sat Jun 20 13:55:58 PDT 2015" }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index e14b558..375d05a 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -27,6 +27,82 @@
         ResultOffsetPath="results"
         QueryOffsetPath="queries"
         QueryFileExtension=".aql">
+    <test-group name="feeds">
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_01">
+                <output-dir compare="Text">feeds_01</output-dir>
+            </compilation-unit>
+        </test-case>
+        <!--Disable it because of sporadic failures. Abdullah will re-enable it.
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_02">
+                <output-dir compare="Text">feeds_02</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_03">
+                <output-dir compare="Text">feeds_03</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_04">
+                <output-dir compare="Text">feeds_04</output-dir>
+            </compilation-unit>
+        </test-case>
+
+        <test-case FilePath="feeds">
+          <compilation-unit name="feeds_06">
+            <output-dir compare="Text">feeds_06</output-dir>
+          </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_07">
+                <output-dir compare="Text">feeds_07</output-dir>
+            </compilation-unit>
+        </test-case>
+
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_08">
+                <output-dir compare="Text">feeds_08</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_09">
+                <output-dir compare="Text">feeds_09</output-dir>
+            </compilation-unit>
+        </test-case>
+
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_10">
+                <output-dir compare="Text">feeds_10</output-dir>
+            </compilation-unit>
+        </test-case>
+
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_11">
+                <output-dir compare="Text">feeds_11</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_12">
+                <output-dir compare="Text">feeds_12</output-dir>
+            </compilation-unit>
+        </test-case>
+
+        <test-case FilePath="feeds">
+            <compilation-unit name="issue_230_feeds">
+                <output-dir compare="Text">issue_230_feeds</output-dir>
+            </compilation-unit>
+        </test-case>
+
+        <test-case FilePath="feeds">
+            <compilation-unit name="issue_711_feeds">
+                <output-dir compare="Text">issue_711_feeds</output-dir>
+            </compilation-unit>
+        </test-case>
+        -->
+
+    </test-group>
     <test-group name="flwor">
         <test-case FilePath="flwor">
             <compilation-unit name="at00">
@@ -6137,83 +6213,6 @@
             </compilation-unit>
         </test-case>
     </test-group>
-    <test-group name="feeds">
-
-        <!--Disable it because of sporadic failures. Raman will re-enable it.
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_01">
-                <output-dir compare="Text">feeds_01</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_02">
-                <output-dir compare="Text">feeds_02</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_03">
-                <output-dir compare="Text">feeds_03</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_04">
-                <output-dir compare="Text">feeds_04</output-dir>
-            </compilation-unit>
-        </test-case>
-
-        <test-case FilePath="feeds">
-          <compilation-unit name="feeds_06">
-            <output-dir compare="Text">feeds_06</output-dir>
-          </compilation-unit>
-        </test-case>
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_07">
-                <output-dir compare="Text">feeds_07</output-dir>
-            </compilation-unit>
-        </test-case>
-
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_08">
-                <output-dir compare="Text">feeds_08</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_09">
-                <output-dir compare="Text">feeds_09</output-dir>
-            </compilation-unit>
-        </test-case>
-
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_10">
-                <output-dir compare="Text">feeds_10</output-dir>
-            </compilation-unit>
-        </test-case>
-
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_11">
-                <output-dir compare="Text">feeds_11</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_12">
-                <output-dir compare="Text">feeds_12</output-dir>
-            </compilation-unit>
-        </test-case>
-
-        <test-case FilePath="feeds">
-            <compilation-unit name="issue_230_feeds">
-                <output-dir compare="Text">issue_230_feeds</output-dir>
-            </compilation-unit>
-        </test-case>
-
-        <test-case FilePath="feeds">
-            <compilation-unit name="issue_711_feeds">
-                <output-dir compare="Text">issue_711_feeds</output-dir>
-            </compilation-unit>
-        </test-case>
-        -->
-
-    </test-group>
     <test-group name="hdfs">
         <test-case FilePath="hdfs">
             <compilation-unit name="large-record">

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index b8c3f2f..3386252 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -24,7 +24,6 @@ import java.util.concurrent.Executor;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IFeedManager;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
@@ -79,7 +78,7 @@ public interface IAsterixAppRuntimeContext {
 
     public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
 
-    public IFeedManager getFeedManager();
+    public Object getFeedManager();
 
     public IRemoteRecoveryManager getRemoteRecoveryManager();
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index cc7ec84..13ce403 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -197,7 +197,7 @@ public class AsterixPropertiesAccessor {
     }
 
     public ClusterPartition getMetadataPartiton() {
-        //metadata partition is always the first partition on the metadata node
+        // metadata partition is always the first partition on the metadata node
         return nodePartitionsMap.get(metadataNodeName)[0];
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataConstants.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataConstants.java b/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataConstants.java
new file mode 100644
index 0000000..943e385
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataConstants.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.config;
+
+public class MetadataConstants {
+
+    // Name of the dataverse the metadata lives in.
+    public final static String METADATA_DATAVERSE_NAME = "Metadata";
+
+    // Name of the node group where metadata is stored on.
+    public final static String METADATA_NODEGROUP_NAME = "MetadataGroup";
+
+    // Name of the default nodegroup where internal/feed datasets will be partitioned
+    // if an explicit nodegroup is not specified at the time of creation of a dataset
+    public static final String METADATA_DEFAULT_NODEGROUP_NAME = "DEFAULT_NG_ALL_NODES";
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index fd1ebb8..d25e51f 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -21,6 +21,7 @@ package org.apache.asterix.common.dataflow;
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.FrameDataException;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -41,6 +42,7 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
 
     private final boolean isPrimary;
     private AbstractLSMIndex lsmIndex;
+    private int i = 0;
 
     public boolean isPrimary() {
         return isPrimary;
@@ -85,7 +87,7 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
         ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
         int tupleCount = accessor.getTupleCount();
         try {
-            for (int i = 0; i < tupleCount; i++) {
+            for (; i < tupleCount; i++) {
                 if (tupleFilter != null) {
                     frameTuple.reset(accessor, i);
                     if (!tupleFilter.accept(frameTuple)) {
@@ -117,11 +119,13 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
                 }
             }
         } catch (Throwable th) {
-            throw new HyracksDataException(th);
+            FrameDataException fde = new FrameDataException(i, th);
+            throw fde;
         }
         writeBuffer.ensureFrameSize(buffer.capacity());
         FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
         FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
+        i = 0;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java b/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java
index 136a196..18b5264 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java
@@ -26,7 +26,7 @@ public class FrameDataException extends HyracksDataException {
 
     private final int tupleIndex;
 
-    public FrameDataException(int tupleIndex, Exception cause) {
+    public FrameDataException(int tupleIndex, Throwable cause) {
         super(cause);
         this.tupleIndex = tupleIndex;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/BasicMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/BasicMonitoredBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/BasicMonitoredBuffer.java
deleted file mode 100644
index 70833fc..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/BasicMonitoredBuffer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.asterix.common.feeds.api.IFramePostProcessor;
-import org.apache.asterix.common.feeds.api.IFramePreprocessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class BasicMonitoredBuffer extends MonitoredBuffer {
-
-    public BasicMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter, FrameTupleAccessor fta,
-            RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
-            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
-            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
-        super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
-                exceptionHandler, callback, nPartitions, policyAccessor);
-    }
-
-    @Override
-    protected boolean monitorProcessingRate() {
-        return false;
-    }
-
-    @Override
-    protected boolean logInflowOutflowRate() {
-        return false;
-    }
-
-    @Override
-    protected IFramePreprocessor getFramePreProcessor() {
-        return null;
-    }
-
-    @Override
-    protected IFramePostProcessor getFramePostProcessor() {
-        return null;
-    }
-
-    @Override
-    protected boolean monitorInputQueueLength() {
-        return false;
-    }
-
-    @Override
-    protected boolean reportInflowRate() {
-        return false;
-    }
-
-    @Override
-    protected boolean reportOutflowRate() {
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/CollectionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/CollectionRuntime.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/CollectionRuntime.java
deleted file mode 100644
index 9865501..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/CollectionRuntime.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.FeedFrameCollector.State;
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.asterix.common.feeds.api.ISubscriberRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
-
-/**
- * Represents the feed runtime that collects feed tuples from another feed.
- * In case of a primary feed, the CollectionRuntime collects tuples from the feed
- * intake job. For a secondary feed, tuples are collected from the intake/compute
- * runtime associated with the source feed.
- */
-public class CollectionRuntime extends FeedRuntime implements ISubscriberRuntime {
-
-    private final FeedConnectionId connectionId;
-    private final ISubscribableRuntime sourceRuntime;
-    private final Map<String, String> feedPolicy;
-    private FeedFrameCollector frameCollector;
-
-    public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-            FeedRuntimeInputHandler inputSideHandler, IFrameWriter outputSideWriter, ISubscribableRuntime sourceRuntime,
-            Map<String, String> feedPolicy) {
-        super(runtimeId, inputSideHandler, outputSideWriter);
-        this.connectionId = connectionId;
-        this.sourceRuntime = sourceRuntime;
-        this.feedPolicy = feedPolicy;
-    }
-
-    public State waitTillCollectionOver() throws InterruptedException {
-        if (!(isCollectionOver())) {
-            synchronized (frameCollector) {
-                while (!isCollectionOver()) {
-                    frameCollector.wait();
-                }
-            }
-        }
-        return frameCollector.getState();
-    }
-
-    private boolean isCollectionOver() {
-        return frameCollector.getState().equals(FeedFrameCollector.State.FINISHED)
-                || frameCollector.getState().equals(FeedFrameCollector.State.HANDOVER);
-    }
-
-    @Override
-    public void setMode(Mode mode) {
-        getInputHandler().setMode(mode);
-    }
-
-    @Override
-    public Map<String, String> getFeedPolicy() {
-        return feedPolicy;
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public ISubscribableRuntime getSourceRuntime() {
-        return sourceRuntime;
-    }
-
-    public void setFrameCollector(FeedFrameCollector frameCollector) {
-        this.frameCollector = frameCollector;
-    }
-
-    @Override
-    public FeedFrameCollector getFrameCollector() {
-        return frameCollector;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/ComputeSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/ComputeSideMonitoredBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/ComputeSideMonitoredBuffer.java
deleted file mode 100644
index 7ec3fdf..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/ComputeSideMonitoredBuffer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.asterix.common.feeds.api.IFramePostProcessor;
-import org.apache.asterix.common.feeds.api.IFramePreprocessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class ComputeSideMonitoredBuffer extends MonitoredBuffer {
-
-    public ComputeSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
-            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
-            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
-            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
-        super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
-                exceptionHandler, callback, nPartitions, policyAccessor);
-    }
-
-    @Override
-    protected boolean monitorProcessingRate() {
-        return true;
-    }
-
-    protected boolean logInflowOutflowRate() {
-        return true;
-    }
-
-    @Override
-    protected boolean monitorInputQueueLength() {
-        return true;
-    }
-
-    @Override
-    protected IFramePreprocessor getFramePreProcessor() {
-        return null;
-    }
-
-    @Override
-    protected IFramePostProcessor getFramePostProcessor() {
-        return null;
-    }
-
-    @Override
-    protected boolean reportOutflowRate() {
-        return false;
-    }
-
-    @Override
-    protected boolean reportInflowRate() {
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucket.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucket.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucket.java
deleted file mode 100644
index ccd6547..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucket.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class DataBucket {
-
-    private static final AtomicInteger globalBucketId = new AtomicInteger(0);
-
-    private final ByteBuffer content;
-    private final AtomicInteger readCount;
-    private final int bucketId;
-
-    private int desiredReadCount;
-    private ContentType contentType;
-
-    private final DataBucketPool pool;
-
-    public enum ContentType {
-        DATA, // data (feed tuple)
-        EOD, // A signal indicating that there shall be no more data
-        EOSD // End of processing of spilled data
-    }
-
-    public DataBucket(DataBucketPool pool) {
-        this.content = ByteBuffer.allocate(pool.getFrameSize());
-        this.readCount = new AtomicInteger(0);
-        this.pool = pool;
-        this.contentType = ContentType.DATA;
-        this.bucketId = globalBucketId.incrementAndGet();
-    }
-
-    public synchronized void reset(ByteBuffer frame) {
-        if (frame != null) {
-            content.flip();
-            System.arraycopy(frame.array(), 0, content.array(), 0, frame.limit());
-            content.limit(frame.limit());
-            content.position(0);
-        }
-    }
-
-    public synchronized void doneReading() {
-        if (readCount.incrementAndGet() == desiredReadCount) {
-            readCount.set(0);
-            pool.returnDataBucket(this);
-        }
-    }
-
-    public void setDesiredReadCount(int rCount) {
-        this.desiredReadCount = rCount;
-    }
-
-    public ContentType getContentType() {
-        return contentType;
-    }
-
-    public void setContentType(ContentType contentType) {
-        this.contentType = contentType;
-    }
-
-    public synchronized ByteBuffer getContent() {
-        return content;
-    }
-
-    @Override
-    public String toString() {
-        return "DataBucket [" + bucketId + "]" + " (" + readCount + "," + desiredReadCount + ")";
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucketPool.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucketPool.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucketPool.java
deleted file mode 100644
index 2e7e60c..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucketPool.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.Stack;
-
-import org.apache.asterix.common.feeds.api.IFeedMemoryComponent;
-import org.apache.asterix.common.feeds.api.IFeedMemoryManager;
-
-/**
- * Represents a pool of reusable {@link DataBucket}
- */
-public class DataBucketPool implements IFeedMemoryComponent {
-
-    /** A unique identifier for the memory component **/
-    private final int componentId;
-
-    /** The {@link IFeedMemoryManager} for the NodeController **/
-    private final IFeedMemoryManager memoryManager;
-
-    /** A collection of available data buckets {@link DataBucket} **/
-    private final Stack<DataBucket> pool;
-
-    /** The total number of data buckets {@link DataBucket} allocated **/
-    private int totalAllocation;
-
-    /** The fixed frame size as configured for the asterix runtime **/
-    private final int frameSize;
-
-    public DataBucketPool(int componentId, IFeedMemoryManager memoryManager, int size, int frameSize) {
-        this.componentId = componentId;
-        this.memoryManager = memoryManager;
-        this.pool = new Stack<DataBucket>();
-        this.frameSize = frameSize;
-        expand(size);
-    }
-
-    public synchronized void returnDataBucket(DataBucket bucket) {
-        pool.push(bucket);
-    }
-
-    public synchronized DataBucket getDataBucket() {
-        if (pool.size() == 0) {
-            if (!memoryManager.expandMemoryComponent(this)) {
-                return null;
-            }
-        }
-        return pool.pop();
-    }
-
-    @Override
-    public Type getType() {
-        return Type.POOL;
-    }
-
-    @Override
-    public int getTotalAllocation() {
-        return totalAllocation;
-    }
-
-    @Override
-    public int getComponentId() {
-        return componentId;
-    }
-
-    @Override
-    public void expand(int delta) {
-        for (int i = 0; i < delta; i++) {
-            DataBucket bucket = new DataBucket(this);
-            pool.add(bucket);
-        }
-        totalAllocation += delta;
-    }
-
-    @Override
-    public void reset() {
-        totalAllocation -= pool.size();
-        pool.clear();
-    }
-
-    @Override
-    public String toString() {
-        return "DataBucketPool" + "[" + componentId + "]" + "(" + totalAllocation + ")";
-    }
-
-    public int getSize() {
-        return pool.size();
-    }
-
-    public int getFrameSize() {
-        return frameSize;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
deleted file mode 100644
index d0e371e..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
-import org.apache.asterix.common.feeds.api.IFeedOperatorOutputSideHandler.Type;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-/**
- * Provides mechanism for distributing the frames, as received from an operator to a
- * set of registered readers. Each reader typically operates at a different pace. Readers
- * are isolated from each other to ensure that a slow reader does not impact the progress of
- * others.
- **/
-public class DistributeFeedFrameWriter implements IFrameWriter {
-
-    private static final Logger LOGGER = Logger.getLogger(DistributeFeedFrameWriter.class.getName());
-
-    /** A unique identifier for the feed to which the incoming tuples belong. **/
-    private final FeedId feedId;
-
-    /** An instance of FrameDistributor that provides the mechanism for distributing a frame to multiple readers, each operating in isolation. **/
-    private final FrameDistributor frameDistributor;
-
-    /** The original frame writer instantiated as part of job creation **/
-    private IFrameWriter writer;
-
-    /** The feed operation whose output is being distributed by the DistributeFeedFrameWriter **/
-    private final FeedRuntimeType feedRuntimeType;
-
-    /** The value of the partition 'i' if this is the i'th instance of the associated operator **/
-    private final int partition;
-
-    public DistributeFeedFrameWriter(IHyracksTaskContext ctx, FeedId feedId, IFrameWriter writer,
-            FeedRuntimeType feedRuntimeType, int partition, FrameTupleAccessor fta, IFeedManager feedManager)
-                    throws IOException {
-        this.feedId = feedId;
-        this.frameDistributor = new FrameDistributor(feedId, feedRuntimeType, partition, true,
-                feedManager.getFeedMemoryManager(), fta);
-        this.feedRuntimeType = feedRuntimeType;
-        this.partition = partition;
-        this.writer = writer;
-    }
-
-    public FeedFrameCollector subscribeFeed(FeedPolicyAccessor fpa, IFrameWriter frameWriter,
-            FeedConnectionId connectionId) throws Exception {
-        FeedFrameCollector collector = null;
-        if (!frameDistributor.isRegistered(frameWriter)) {
-            collector = new FeedFrameCollector(frameDistributor, fpa, frameWriter, connectionId);
-            frameDistributor.registerFrameCollector(collector);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Registered subscriber, new mode " + frameDistributor.getMode());
-            }
-            return collector;
-        } else {
-            throw new IllegalStateException("subscriber " + feedId + " already registered");
-        }
-    }
-
-    public void unsubscribeFeed(IFrameWriter recipientFeedFrameWriter) throws Exception {
-        boolean success = frameDistributor.deregisterFrameCollector(recipientFeedFrameWriter);
-        if (!success) {
-            throw new IllegalStateException(
-                    "Invalid attempt to unregister FeedFrameWriter " + recipientFeedFrameWriter + " not registered.");
-        }
-    }
-
-    public void notifyEndOfFeed() {
-        frameDistributor.notifyEndOfFeed();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        try {
-            frameDistributor.close();
-        } finally {
-            writer.close();
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        writer.fail();
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer frame) throws HyracksDataException {
-        frameDistributor.nextFrame(frame);
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        writer.open();
-    }
-
-    public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
-        return frameDistributor.getRegisteredReaders();
-    }
-
-    public void setWriter(IFrameWriter writer) {
-        this.writer = writer;
-    }
-
-    public Type getType() {
-        return IFeedOperatorOutputSideHandler.Type.DISTRIBUTE_FEED_OUTPUT_HANDLER;
-    }
-
-    @Override
-    public String toString() {
-        return feedId.toString() + feedRuntimeType + "[" + partition + "]";
-    }
-
-    public FrameDistributor.DistributionMode getDistributionMode() {
-        return frameDistributor.getDistributionMode();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedActivity.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedActivity.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedActivity.java
deleted file mode 100644
index 11130a1..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedActivity.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.common.feeds;
-
-import java.util.Map;
-
-public class FeedActivity implements Comparable<FeedActivity> {
-
-    private int activityId;
-
-    private final String dataverseName;
-    private final String datasetName;
-    private final String feedName;
-    private final Map<String, String> feedActivityDetails;
-
-    public static class FeedActivityDetails {
-        public static final String INTAKE_LOCATIONS = "intake-locations";
-        public static final String COMPUTE_LOCATIONS = "compute-locations";
-        public static final String STORAGE_LOCATIONS = "storage-locations";
-        public static final String COLLECT_LOCATIONS = "collect-locations";
-        public static final String FEED_POLICY_NAME = "feed-policy-name";
-        public static final String FEED_CONNECT_TIMESTAMP = "feed-connect-timestamp";
-
-    }
-
-    public FeedActivity(String dataverseName, String feedName, String datasetName,
-            Map<String, String> feedActivityDetails) {
-        this.dataverseName = dataverseName;
-        this.feedName = feedName;
-        this.datasetName = datasetName;
-        this.feedActivityDetails = feedActivityDetails;
-    }
-
-    public String getDataverseName() {
-        return dataverseName;
-    }
-
-    public String getDatasetName() {
-        return datasetName;
-    }
-
-    public String getFeedName() {
-        return feedName;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (this == other) {
-            return true;
-        }
-        if (!(other instanceof FeedActivity)) {
-            return false;
-        }
-
-        if (!((FeedActivity) other).dataverseName.equals(dataverseName)) {
-            return false;
-        }
-        if (!((FeedActivity) other).datasetName.equals(datasetName)) {
-            return false;
-        }
-        if (!((FeedActivity) other).getFeedName().equals(feedName)) {
-            return false;
-        }
-        if (((FeedActivity) other).getActivityId() != (activityId)) {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return toString().hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return dataverseName + "." + feedName + " --> " + datasetName + " " + activityId;
-    }
-
-    public String getConnectTimestamp() {
-        return feedActivityDetails.get(FeedActivityDetails.FEED_CONNECT_TIMESTAMP);
-    }
-
-    public int getActivityId() {
-        return activityId;
-    }
-
-    public void setActivityId(int activityId) {
-        this.activityId = activityId;
-    }
-
-    public Map<String, String> getFeedActivityDetails() {
-        return feedActivityDetails;
-    }
-
-    @Override
-    public int compareTo(FeedActivity o) {
-        return o.getActivityId() - this.activityId;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedCollectRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedCollectRuntimeInputHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedCollectRuntimeInputHandler.java
deleted file mode 100644
index 97dc4f8..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedCollectRuntimeInputHandler.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class FeedCollectRuntimeInputHandler extends FeedRuntimeInputHandler {
-
-    private final FeedFrameCache feedFrameCache;
-
-    public FeedCollectRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-            IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled,
-            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions)
-            throws IOException {
-        super(ctx, connectionId, runtimeId, coreOperator, fpa, bufferingEnabled, fta, recordDesc, feedManager,
-                nPartitions);
-        this.feedFrameCache = new FeedFrameCache(ctx, fta, coreOperator);
-    }
-
-    public void process(ByteBuffer frame) throws HyracksDataException {
-        feedFrameCache.sendMessage(frame);
-        super.process(frame);
-    }
-
-    public void replayFrom(int recordId) throws HyracksDataException {
-        feedFrameCache.replayRecords(recordId);
-    }
-
-    public void dropTill(int recordId) {
-        feedFrameCache.dropTillRecordId(recordId);
-    }
-    
-    public void replayCached() throws HyracksDataException{
-        feedFrameCache.replayAll();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectJobInfo.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectJobInfo.java
deleted file mode 100644
index 4d6a427..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectJobInfo.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IFeedJoint;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedConnectJobInfo extends FeedJobInfo {
-
-    private final FeedConnectionId connectionId;
-    private final Map<String, String> feedPolicy;
-    private final IFeedJoint sourceFeedJoint;
-    private IFeedJoint computeFeedJoint;
-
-    private List<String> collectLocations;
-    private List<String> computeLocations;
-    private List<String> storageLocations;
-
-    public FeedConnectJobInfo(JobId jobId, FeedJobState state, FeedConnectionId connectionId,
-            IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, JobSpecification spec,
-            Map<String, String> feedPolicy) {
-        super(jobId, state, FeedJobInfo.JobType.FEED_CONNECT, spec);
-        this.connectionId = connectionId;
-        this.sourceFeedJoint = sourceFeedJoint;
-        this.computeFeedJoint = computeFeedJoint;
-        this.feedPolicy = feedPolicy;
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public List<String> getCollectLocations() {
-        return collectLocations;
-    }
-
-    public List<String> getComputeLocations() {
-        return computeLocations;
-    }
-
-    public List<String> getStorageLocations() {
-        return storageLocations;
-    }
-
-    public void setCollectLocations(List<String> collectLocations) {
-        this.collectLocations = collectLocations;
-    }
-
-    public void setComputeLocations(List<String> computeLocations) {
-        this.computeLocations = computeLocations;
-    }
-
-    public void setStorageLocations(List<String> storageLocations) {
-        this.storageLocations = storageLocations;
-    }
-
-    public IFeedJoint getSourceFeedJoint() {
-        return sourceFeedJoint;
-    }
-
-    public IFeedJoint getComputeFeedJoint() {
-        return computeFeedJoint;
-    }
-
-    public Map<String, String> getFeedPolicy() {
-        return feedPolicy;
-    }
-
-    public void setComputeFeedJoint(IFeedJoint computeFeedJoint) {
-        this.computeFeedJoint = computeFeedJoint;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionId.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionId.java
deleted file mode 100644
index 355d340..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionId.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.Serializable;
-
-/**
- * A unique identifier for a feed connection. A feed connection is an instance of a data feed that is flowing into a dataset.
- */
-public class FeedConnectionId implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedId feedId;
-    private final String datasetName;
-
-    public FeedConnectionId(FeedId feedId, String datasetName) {
-        this.feedId = feedId;
-        this.datasetName = datasetName;
-    }
-
-    public FeedConnectionId(String dataverse, String feedName, String datasetName) {
-        this.feedId = new FeedId(dataverse, feedName);
-        this.datasetName = datasetName;
-    }
-
-    public FeedId getFeedId() {
-        return feedId;
-    }
-
-    public String getDatasetName() {
-        return datasetName;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || !(o instanceof FeedConnectionId)) {
-            return false;
-        }
-
-        if (this == o
-                || (((FeedConnectionId) o).getFeedId().equals(feedId) && ((FeedConnectionId) o).getDatasetName()
-                        .equals(datasetName))) {
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        return toString().hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return feedId.toString() + "-->" + datasetName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionRequest.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionRequest.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionRequest.java
deleted file mode 100644
index 6230eac..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionRequest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
-
-/**
- * A request for connecting a feed to a dataset.
- */
-public class FeedConnectionRequest {
-
-    public enum ConnectionStatus {
-        /** initial state upon creating a connection request **/
-        INITIALIZED,
-
-        /** connection establish; feed is receiving data **/
-        ACTIVE,
-
-        /** connection removed; feed is not receiving data **/
-        INACTIVE,
-
-        /** connection request failed **/
-        FAILED
-    }
-
-    /** Feed joint on the feed pipeline that serves as the source for this subscription **/
-    private final FeedJointKey feedJointKey;
-
-    /** Location in the source feed pipeline from where feed tuples are received. **/
-    private final ConnectionLocation connectionLocation;
-
-    /** List of functions that need to be applied in sequence after the data hand-off at the source feedPointKey. **/
-    private final List<String> functionsToApply;
-
-    /** Status associated with the subscription. */
-    private ConnectionStatus connectionStatus;
-
-    /** Name of the policy that governs feed ingestion **/
-    private final String policy;
-
-    /** Policy associated with a feed connection **/
-    private final Map<String, String> policyParameters;
-
-    /** Target dataset associated with the connection request **/
-    private final String targetDataset;
-
-    private final FeedId receivingFeedId;
-
-    
-    public FeedConnectionRequest(FeedJointKey feedPointKey, ConnectionLocation connectionLocation,
-            List<String> functionsToApply, String targetDataset, String policy, Map<String, String> policyParameters,
-            FeedId receivingFeedId) {
-        this.feedJointKey = feedPointKey;
-        this.connectionLocation = connectionLocation;
-        this.functionsToApply = functionsToApply;
-        this.targetDataset = targetDataset;
-        this.policy = policy;
-        this.policyParameters = policyParameters;
-        this.receivingFeedId = receivingFeedId;
-        this.connectionStatus = ConnectionStatus.INITIALIZED;
-    }
-
-    public FeedJointKey getFeedJointKey() {
-        return feedJointKey;
-    }
-
-    public ConnectionStatus getConnectionStatus() {
-        return connectionStatus;
-    }
-
-    public void setSubscriptionStatus(ConnectionStatus connectionStatus) {
-        this.connectionStatus = connectionStatus;
-    }
-
-    public String getPolicy() {
-        return policy;
-    }
-
-    public String getTargetDataset() {
-        return targetDataset;
-    }
-
-    public ConnectionLocation getSubscriptionLocation() {
-        return connectionLocation;
-    }
-
-    public FeedId getReceivingFeedId() {
-        return receivingFeedId;
-    }
-
-    public Map<String, String> getPolicyParameters() {
-        return policyParameters;
-    }
-
-    public List<String> getFunctionsToApply() {
-        return functionsToApply;
-    }
-
-    @Override
-    public String toString() {
-        return "Feed Connection Request " + feedJointKey + " [" + connectionLocation + "]" + " Apply ("
-                + StringUtils.join(functionsToApply, ",") + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConstants.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConstants.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConstants.java
deleted file mode 100644
index 05e554b..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConstants.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-public class FeedConstants {
-
-    public static final class StatisticsConstants {
-        public static final String INTAKE_TUPLEID = "intake-tupleid";
-        public static final String INTAKE_PARTITION = "intake-partition";
-        public static final String INTAKE_TIMESTAMP = "intake-timestamp";
-        public static final String COMPUTE_TIMESTAMP = "compute-timestamp";
-        public static final String STORE_TIMESTAMP = "store-timestamp";
-
-    }
-
-    public static final class MessageConstants {
-        public static final String MESSAGE_TYPE = "message-type";
-        public static final String NODE_ID = "nodeId";
-        public static final String DATAVERSE = "dataverse";
-        public static final String FEED = "feed";
-        public static final String DATASET = "dataset";
-        public static final String AQL = "aql";
-        public static final String RUNTIME_TYPE = "runtime-type";
-        public static final String PARTITION = "partition";
-        public static final String INTAKE_PARTITION = "intake-partition";
-        public static final String INFLOW_RATE = "inflow-rate";
-        public static final String OUTFLOW_RATE = "outflow-rate";
-        public static final String MODE = "mode";
-        public static final String CURRENT_CARDINALITY = "current-cardinality";
-        public static final String REDUCED_CARDINALITY = "reduced-cardinality";
-        public static final String VALUE_TYPE = "value-type";
-        public static final String VALUE = "value";
-        public static final String CPU_LOAD = "cpu-load";
-        public static final String N_RUNTIMES = "n_runtimes";
-        public static final String HEAP_USAGE = "heap_usage";
-        public static final String OPERAND_ID = "operand-id";
-        public static final String COMPUTE_PARTITION_RETAIN_LIMIT = "compute-partition-retain-limit";
-        public static final String LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP = "last-persisted-tuple-intake_timestamp";
-        public static final String PERSISTENCE_DELAY_WITHIN_LIMIT = "persistence-delay-within-limit";
-        public static final String AVERAGE_PERSISTENCE_DELAY = "average-persistence-delay";
-        public static final String COMMIT_ACKS = "commit-acks";
-        public static final String MAX_WINDOW_ACKED = "max-window-acked";
-        public static final String BASE = "base";
-        public static final String NOT_APPLICABLE = "N/A";
-        
-    }
-
-    public static final class NamingConstants {
-        public static final String LIBRARY_NAME_SEPARATOR = "#";
-    }
-
-    public static class JobConstants {
-        public static final int DEFAULT_FRAME_SIZE = 8192;
-    }
-}


Mime
View raw message