asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: ASTERIXDB-1302 ASTERIXDB-1301 Fix Socket Feed Connection
Date Fri, 26 Feb 2016 14:00:27 GMT
abdullah alamoudi has submitted this change and it was merged.

Change subject: ASTERIXDB-1302 ASTERIXDB-1301 Fix Socket Feed Connection
......................................................................


ASTERIXDB-1302 ASTERIXDB-1301 Fix Socket Feed Connection

A bug causes a read lock to never be released when a feed is
connected with "wait-for-completion" set to false. The bug
was fixed and a test case was added.
Another bug was causing the socket feed to not receive
connections correctly. The bug was fixed and a test case
was added.
Additionally, this change ensures that adapters have absolute
partitions to ensure consistency with regards to feed log
manager.

Change-Id: I8f6e982440d3577343f2479c3779653a9c3db614
Reviewed-on: https://asterix-gerrit.ics.uci.edu/660
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ildar Absalyamov <ildar.absalyamov@gmail.com>
---
M asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
M asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
M asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.1.ddl.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.2.update.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql
A asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm
M asterix-app/src/test/resources/runtimets/testsuite.xml
A asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
A asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
A asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
M asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
M asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
A asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
A asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
M asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
M asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
M asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
M asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
M asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
M asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
50 files changed, 1,183 insertions(+), 216 deletions(-)

Approvals:
  Ildar Absalyamov: Looks good to me, approved
  Jenkins: Verified



diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index e33aed2..be9452b 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -87,7 +87,8 @@
             if (tempPath.endsWith(File.separator)) {
                 tempPath = tempPath.substring(0, tempPath.length() - 1);
             }
-            //get initial partitions from properties
+            System.err.println("Using the path: " + tempPath);
+            // get initial partitions from properties
             String[] nodeStores = propertiesAccessor.getStores().get(ncName);
             if (nodeStores == null) {
                 throw new Exception("Coudn't find stores for NC: " + ncName);
@@ -97,7 +98,7 @@
                 tempDirPath += File.separator;
             }
             for (int p = 0; p < nodeStores.length; p++) {
-                //create IO devices based on stores
+                // create IO devices based on stores
                 String iodevicePath = tempDirPath + ncConfig1.nodeId + File.separator + nodeStores[p];
                 File ioDeviceDir = new File(iodevicePath);
                 ioDeviceDir.mkdirs();
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index c0245d7..5cd490a 100644
--- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@ -22,6 +22,7 @@
 import java.util.List;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.feed.api.IFeedJoint;
 import org.apache.asterix.external.feed.api.IFeedMessage;
@@ -37,9 +38,11 @@
 import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
 import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor;
 import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.file.JobSpecificationUtils;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
@@ -49,6 +52,9 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 
 /**
@@ -251,4 +257,17 @@
                 completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED);
         return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations);
     }
+
+    public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws Exception {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        AlgebricksAbsolutePartitionConstraint locations = AsterixClusterProperties.INSTANCE.getClusterLocations();
+        FileSplit[] feedLogFileSplits = FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(),
+                locations);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = StoragePathUtil
+                .splitProviderAndPartitionConstraints(feedLogFileSplits);
+        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second);
+        spec.addRoot(frod);
+        return spec;
+    }
 }
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 9f024e9..ea50221 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -209,7 +209,7 @@
         ASYNC_DEFERRED
     }
 
-    public static final boolean IS_DEBUG_MODE = false;//true
+    public static final boolean IS_DEBUG_MODE = false;// true
     private final List<Statement> statements;
     private final SessionConfig sessionConfig;
     private Dataverse activeDefaultDataverse;
@@ -593,8 +593,9 @@
                     }
                     if (compactionPolicy == null) {
                         if (filterField != null) {
-                            // If the dataset has a filter and the user didn't specify a merge policy, then we will pick the
-                            // correlated-prefix as the default merge policy.
+                            //If the dataset has a filter and the user didn't specify a merge
+                            //policy, then we will pick the
+                            //correlated-prefix as the default merge policy.
                             compactionPolicy = GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
                             compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
                         }
@@ -659,10 +660,10 @@
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
 
                 //#. execute compensation operations
-                //   remove the index in NC
-                //   [Notice]
-                //   As long as we updated(and committed) metadata, we should remove any effect of the job
-                //   because an exception occurs during runJob.
+                //remove the index in NC
+                //[Notice]
+                //As long as we updated(and committed) metadata, we should remove any effect of the job
+                //because an exception occurs during runJob.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -679,7 +680,7 @@
                     }
                 }
 
-                //   remove the record from the metadata.
+                //remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -803,7 +804,7 @@
         String indexName = null;
         JobSpecification spec = null;
         Dataset ds = null;
-        // For external datasets
+        //For external datasets
         ArrayList<ExternalFile> externalFilesSnapshot = null;
         boolean firstExternalDatasetIndex = false;
         boolean filesIndexReplicated = false;
@@ -880,8 +881,10 @@
                 }
             }
 
-            // Checks whether a user is trying to create an inverted secondary index on a dataset with a variable-length primary key.
-            // Currently, we do not support this. Therefore, as a temporary solution, we print an error message and stop.
+            //Checks whether a user is trying to create an inverted secondary index on a dataset
+            //with a variable-length primary key.
+            //Currently, we do not support this. Therefore, as a temporary solution, we print an
+            //error message and stop.
             if (stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
                     || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
                     || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
@@ -891,7 +894,7 @@
                     IAType keyType = aRecordType.getSubFieldType(partitioningKey);
                     ITypeTraits typeTrait = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
 
-                    // If it is not a fixed length
+                    //If it is not a fixed length
                     if (typeTrait.getFixedLength() < 0) {
                         throw new AlgebricksException("The keyword or ngram index -" + indexName
                                 + " cannot be created on the dataset -" + datasetName
@@ -904,27 +907,27 @@
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 validateIfResourceIsActiveInFeed(dataverseName, datasetName);
             } else {
-                // External dataset
-                // Check if the dataset is indexible
+                //External dataset
+                //Check if the dataset is indexible
                 if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) {
                     throw new AlgebricksException(
                             "dataset using " + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter()
                                     + " Adapter can't be indexed");
                 }
-                // check if the name of the index is valid
+                //Check if the name of the index is valid
                 if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
                     throw new AlgebricksException("external dataset index name is invalid");
                 }
 
-                // Check if the files index exist
+                //Check if the files index exist
                 filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                         datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
                 firstExternalDatasetIndex = (filesIndex == null);
-                // lock external dataset
+                //Lock external dataset
                 ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
                 datasetLocked = true;
                 if (firstExternalDatasetIndex) {
-                    // verify that no one has created an index before we acquire the lock
+                    //Verify that no one has created an index before we acquire the lock
                     filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
                             dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
                     if (filesIndex != null) {
@@ -934,20 +937,20 @@
                     }
                 }
                 if (firstExternalDatasetIndex) {
-                    // Get snapshot from External File System
+                    //Get snapshot from External File System
                     externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
-                    // Add an entry for the files index
+                    //Add an entry for the files index
                     filesIndex = new Index(dataverseName, datasetName,
                             ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE,
                             ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES,
                             ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
                             IMetadataEntity.PENDING_ADD_OP);
                     MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
-                    // Add files to the external files index
+                    //Add files to the external files index
                     for (ExternalFile file : externalFilesSnapshot) {
                         MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
                     }
-                    // This is the first index for the external dataset, replicate the files index
+                    //This is the first index for the external dataset, replicate the files index
                     spec = ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, externalFilesSnapshot,
                             metadataProvider, true);
                     if (spec == null) {
@@ -1025,13 +1028,14 @@
                     indexName);
             index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-            // add another new files index with PendingNoOp after deleting the index with PendingAddOp
+            //add another new files index with PendingNoOp after deleting the index with
+            //PendingAddOp
             if (firstExternalDatasetIndex) {
                 MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
                         filesIndex.getIndexName());
                 filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP);
                 MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
-                // update transaction timestamp
+                //update transaction timestamp
                 ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date());
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
             }
@@ -1041,7 +1045,7 @@
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
             }
-            // If files index was replicated for external dataset, it should be cleaned up on NC side
+            //If files index was replicated for external dataset, it should be cleaned up on NC side
             if (filesIndexReplicated) {
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
@@ -1063,7 +1067,7 @@
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
                 //#. execute compensation operations
-                //   remove the index in NC
+                //remove the index in NC
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1086,7 +1090,7 @@
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     metadataProvider.setMetadataTxnContext(mdTxnCtx);
                     try {
-                        // Drop External Files from metadata
+                        //Drop External Files from metadata
                         MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     } catch (Exception e2) {
@@ -1098,7 +1102,7 @@
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     metadataProvider.setMetadataTxnContext(mdTxnCtx);
                     try {
-                        // Drop the files index from metadata
+                        //Drop the files index from metadata
                         MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                                 datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1110,7 +1114,7 @@
                                 + ") couldn't be removed from the metadata", e);
                     }
                 }
-                // remove the record from the metadata.
+                //remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1183,7 +1187,6 @@
         MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName);
         List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
         try {
-
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
             if (dv == null) {
                 if (stmtDelete.getIfExists()) {
@@ -1216,6 +1219,9 @@
                                     + connection.getDatasetName() + ". Encountered exception " + exception);
                         }
                     }
+                    //prepare job to remove feed log storage
+                    jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
+                            MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedId.getFeedName())));
                 }
             }
 
@@ -1239,7 +1245,7 @@
                     CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
                     jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
                 } else {
-                    // External dataset
+                    //External dataset
                     List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
                             datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
@@ -1260,8 +1266,9 @@
             }
             jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
             //#. mark PendingDropOp on the dataverse record by
-            //   first, deleting the dataverse record from the DATAVERSE_DATASET
-            //   second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
+            //first, deleting the dataverse record from the DATAVERSE_DATASET
+            //second, inserting the dataverse record with the PendingDropOp value into the
+            //DATAVERSE_DATASET
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
             MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
                     new Dataverse(dataverseName, dv.getDataFormat(), IMetadataEntity.PENDING_DROP_OP));
@@ -1295,7 +1302,7 @@
                 }
 
                 //#. execute compensation operations
-                //   remove the all indexes in NC
+                //remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
@@ -1305,7 +1312,7 @@
                     e.addSuppressed(e2);
                 }
 
-                //   remove the record from the metadata.
+                //remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 try {
                     MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
@@ -1352,7 +1359,7 @@
 
             Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<FeedConnectionId, Pair<JobSpecification, Boolean>>();
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                // prepare job spec(s) that would disconnect any active feeds involving the dataset.
+                //prepare job spec(s) that would disconnect any active feeds involving the dataset.
                 List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
                 if (feedConnections != null && !feedConnections.isEmpty()) {
                     for (FeedConnectionId connection : feedConnections) {
@@ -1363,6 +1370,10 @@
                             LOGGER.info("Disconnecting feed " + connection.getFeedId().getFeedName() + " from dataset "
                                     + datasetName + " as dataset is being dropped");
                         }
+                        //prepare job to remove feed log storage
+                        jobsToExecute
+                                .add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE.getFeed(mdTxnCtx,
+                                        connection.getFeedId().getDataverse(), connection.getFeedId().getFeedName())));
                     }
                 }
 
@@ -1404,7 +1415,7 @@
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
             } else {
-                // External dataset
+                //External dataset
                 ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
                 //#. prepare jobs to drop the datatset and the indexes in NC
                 List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
@@ -1447,7 +1458,7 @@
 
             //#. finally, delete the dataset.
             MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-            // Drop the associated nodegroup
+            //Drop the associated nodegroup
             String nodegroup = ds.getNodeGroupName();
             if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) {
                 MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, dataverseName + ":" + datasetName);
@@ -1461,7 +1472,7 @@
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
                 //#. execute compensation operations
-                //   remove the all indexes in NC
+                //remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
@@ -1471,7 +1482,7 @@
                     e.addSuppressed(e2);
                 }
 
-                //   remove the record from the metadata.
+                //remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1506,7 +1517,7 @@
         MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName);
 
         String indexName = null;
-        // For external index
+        //For external index
         boolean dropFilesIndex = false;
         List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
         try {
@@ -1573,7 +1584,7 @@
                 //#. finally, delete the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
             } else {
-                // External dataset
+                //External dataset
                 indexName = stmtIndexDrop.getIndexName().getValue();
                 Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 if (index == null) {
@@ -1593,7 +1604,7 @@
                         datasetName);
                 if (datasetIndexes.size() == 2) {
                     dropFilesIndex = true;
-                    // only one index + the files index, we need to delete both of the indexes
+                    //only one index + the files index, we need to delete both of the indexes
                     for (Index externalIndex : datasetIndexes) {
                         if (ExternalIndexingOperations.isFileIndex(externalIndex)) {
                             cds = new CompiledIndexDropStatement(dataverseName, datasetName,
@@ -1636,7 +1647,7 @@
                 //#. finally, delete the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 if (dropFilesIndex) {
-                    // delete the files index too
+                    //delete the files index too
                     MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
                             ExternalIndexingOperations.getFilesIndexName(datasetName));
                     MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
@@ -1652,7 +1663,7 @@
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
                 //#. execute compensation operations
-                //   remove the all indexes in NC
+                //remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
@@ -1662,7 +1673,7 @@
                     e.addSuppressed(e2);
                 }
 
-                //   remove the record from the metadata.
+                //remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1916,11 +1927,11 @@
             ICompiledDmlStatement stmt)
                     throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
 
-        // Query Rewriting (happens under the same ongoing metadata transaction)
+        //Query Rewriting (happens under the same ongoing metadata transaction)
         Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
                 sessionConfig);
 
-        // Query Compilation (happens under the same ongoing metadata transaction)
+        //Query Compilation (happens under the same ongoing metadata transaction)
         JobSpecification spec = apiFramework.compileQuery(declaredFunctions, metadataProvider, reWrittenQuery.first,
                 reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, stmt);
 
@@ -1930,14 +1941,12 @@
 
     private void handleCreateFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
-
         CreateFeedStatement cfs = (CreateFeedStatement) stmt;
         String dataverseName = getActiveDataverse(cfs.getDataverseName());
         String feedName = cfs.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockManager.INSTANCE.createFeedBegin(dataverseName, dataverseName + "." + feedName);
-
         Feed feed = null;
         try {
             feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, feedName);
@@ -2065,6 +2074,9 @@
                 throw new AlgebricksException("Feed " + feedId
                         + " is currently active and connected to the following dataset(s) \n" + builder.toString());
             } else {
+                JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(
+                        MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getFeedName()));
+                JobUtils.runJob(hcc, spec, true);
                 MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName);
             }
 
@@ -2120,7 +2132,6 @@
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        boolean readLatchAcquired = true;
         boolean subscriberRegistered = false;
         IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
         FeedConnectionId feedConnId = null;
@@ -2149,7 +2160,7 @@
             FeedPolicyEntity feedPolicy = FeedMetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(),
                     mdTxnCtx);
 
-            // All Metadata checks have passed. Feed connect request is valid. //
+            //All Metadata checks have passed. Feed connect request is valid. //
 
             FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedPolicy.getProperties());
             Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple = getFeedConnectionRequest(dataverseName,
@@ -2165,19 +2176,20 @@
                         feedId.getFeedName());
                 Pair<JobSpecification, IAdapterFactory> pair = FeedOperations.buildFeedIntakeJobSpec(primaryFeed,
                         metadataProvider, policyAccessor);
-                // adapter configuration are valid at this stage
-                // register the feed joints (these are auto-de-registered)
+                //adapter configuration are valid at this stage
+                //register the feed joints (these are auto-de-registered)
                 for (IFeedJoint fj : triple.third) {
                     FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);
                 }
                 JobUtils.runJob(hcc, pair.first, false);
-                /* TODO: Fix record tracking
+                /*
+                 * TODO: Fix record tracking
                  * IFeedAdapterFactory adapterFactory = pair.second;
-                if (adapterFactory.isRecordTrackingEnabled()) {
-                    FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId,
-                            adapterFactory.createIntakeProgressTracker());
-                }
-                */
+                 * if (adapterFactory.isRecordTrackingEnabled()) {
+                 * FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId,
+                 * adapterFactory.createIntakeProgressTracker());
+                 * }
+                 */
                 eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
             } else {
                 for (IFeedJoint fj : triple.third) {
@@ -2186,18 +2198,9 @@
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            readLatchAcquired = false;
             eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED);
             if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) {
                 eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_ENDED); // blocking call
-            }
-            String waitForCompletionParam = metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION);
-            boolean waitForCompletion = waitForCompletionParam == null ? false
-                    : Boolean.valueOf(waitForCompletionParam);
-            if (waitForCompletion) {
-                MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
-                        dataverseName + "." + feedName);
-                readLatchAcquired = false;
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -2205,10 +2208,8 @@
             }
             throw e;
         } finally {
-            if (readLatchAcquired) {
-                MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
-                        dataverseName + "." + feedName);
-            }
+            MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
+                    dataverseName + "." + feedName);
             if (subscriberRegistered) {
                 FeedLifecycleListener.INSTANCE.deregisterFeedEventSubscriber(feedConnId, eventSubscriber);
             }
@@ -2242,7 +2243,7 @@
         boolean isFeedJointAvailable = FeedLifecycleListener.INSTANCE.isFeedJointAvailable(feedJointKey);
         if (!isFeedJointAvailable) {
             sourceFeedJoint = FeedLifecycleListener.INSTANCE.getAvailableFeedJoint(feedJointKey);
-            if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is unavailable.
+            if (sourceFeedJoint == null) { //the feed is currently not being ingested, i.e., it is unavailable.
                 connectionLocation = ConnectionLocation.SOURCE_FEED_INTAKE_STAGE;
                 FeedId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId
                 Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getFeedName());
@@ -2262,8 +2263,8 @@
                     functionsToApply.add(f);
                 }
             }
-            // register the compute feed point that represents the final output from the collection of
-            // functions that will be applied.
+            //register the compute feed point that represents the final output from the collection of
+            //functions that will be applied.
             if (!functionsToApply.isEmpty()) {
                 FeedJointKey computeFeedJointKey = new FeedJointKey(feed.getFeedId(), functionsToApply);
                 IFeedJoint computeFeedJoint = new FeedJoint(computeFeedJointKey, feed.getFeedId(),
@@ -2435,7 +2436,7 @@
             Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
                     ds.getItemTypeDataverseName(), itemTypeName);
 
-            // Prepare jobs to compact the datatset and its indexes
+            //Prepare jobs to compact the datatset and its indexes
             List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
             if (indexes.size() == 0) {
                 throw new AlgebricksException(
@@ -2523,9 +2524,9 @@
                         ResultReader resultReader = new ResultReader(hcc, hdc);
                         resultReader.open(jobId, metadataProvider.getResultSetId());
 
-                        // In this case (the normal case), we don't use the
-                        // "response" JSONObject - just stream the results
-                        // to the "out" PrintWriter
+                        //In this case (the normal case), we don't use the
+                        //"response" JSONObject - just stream the results
+                        //to the "out" PrintWriter
                         if (sessionConfig.fmt() == OutputFormat.CSV
                                 && sessionConfig.is(SessionConfig.FORMAT_CSV_HEADER)) {
                             ResultUtils.displayCSVHeader(metadataProvider.findOutputRecordType(), sessionConfig);
@@ -2554,7 +2555,7 @@
             throw e;
         } finally {
             MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), query.getDatasets());
-            // release external datasets' locks acquired during compilation of the query
+            //release external datasets' locks acquired during compilation of the query
             ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
@@ -2615,55 +2616,56 @@
             ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
                     datasetName);
 
-            // Dataset exists ?
+            //Dataset exists ?
             if (ds == null) {
                 throw new AlgebricksException(
                         "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
             }
-            // Dataset external ?
+            //Dataset external ?
             if (ds.getDatasetType() != DatasetType.EXTERNAL) {
                 throw new AlgebricksException(
                         "dataset " + datasetName + " in dataverse " + dataverseName + " is not an external dataset");
             }
-            // Dataset has indexes ?
+            //Dataset has indexes ?
             indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
             if (indexes.size() == 0) {
                 throw new AlgebricksException("External dataset " + datasetName + " in dataverse " + dataverseName
                         + " doesn't have any index");
             }
 
-            // Record transaction time
+            //Record transaction time
             Date txnTime = new Date();
 
-            // refresh lock here
+            //refresh lock here
             ExternalDatasetsRegistry.INSTANCE.refreshBegin(ds);
             lockAquired = true;
 
-            // Get internal files
+            //Get internal files
             metadataFiles = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, ds);
             deletedFiles = new ArrayList<ExternalFile>();
             addedFiles = new ArrayList<ExternalFile>();
             appendedFiles = new ArrayList<ExternalFile>();
 
-            // Compute delta
-            // Now we compare snapshot with external file system
+            //Compute delta
+            //Now we compare snapshot with external file system
             if (ExternalIndexingOperations.isDatasetUptodate(ds, metadataFiles, addedFiles, deletedFiles,
                     appendedFiles)) {
                 ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(txnTime);
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                // latch will be released in the finally clause
+                //latch will be released in the finally clause
                 return;
             }
 
-            // At this point, we know data has changed in the external file system, record transaction in metadata and start
+            //At this point, we know data has changed in the external file system, record
+            //transaction in metadata and start
             transactionDataset = ExternalIndexingOperations.createTransactionDataset(ds);
             /*
              * Remove old dataset record and replace it with a new one
              */
             MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset);
 
-            // Add delta files to the metadata
+            //Add delta files to the metadata
             for (ExternalFile file : addedFiles) {
                 MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
             }
@@ -2674,7 +2676,7 @@
                 MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
             }
 
-            // Create the files index update job
+            //Create the files index update job
             spec = ExternalIndexingOperations.buildFilesIndexUpdateOp(ds, metadataFiles, deletedFiles, addedFiles,
                     appendedFiles, metadataProvider);
 
@@ -2694,10 +2696,10 @@
                 }
             }
 
-            // all index updates has completed successfully, record transaction state
+            //all index updates has completed successfully, record transaction state
             spec = ExternalIndexingOperations.buildCommitJob(ds, indexes, metadataProvider);
 
-            // Aquire write latch again -> start a transaction and record the decision to commit
+            //Aquire write latch again -> start a transaction and record the decision to commit
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             bActiveTxn = true;
@@ -2708,9 +2710,9 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             transactionState = ExternalDatasetTransactionState.READY_TO_COMMIT;
-            // We don't release the latch since this job is expected to be quick
+            //We don't release the latch since this job is expected to be quick
             JobUtils.runJob(hcc, spec, true);
-            // Start a new metadata transaction to record the final state of the transaction
+            //Start a new metadata transaction to record the final state of the transaction
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             bActiveTxn = true;
@@ -2723,11 +2725,11 @@
                     while (iterator.hasNext()) {
                         ExternalFile appendedFile = iterator.next();
                         if (file.getFileName().equals(appendedFile.getFileName())) {
-                            // delete existing file
+                            //delete existing file
                             MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
-                            // delete existing appended file
+                            //delete existing appended file
                             MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, appendedFile);
-                            // add the original file with appended information
+                            //add the original file with appended information
                             appendedFile.setFileNumber(file.getFileNumber());
                             appendedFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
                             MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, appendedFile);
@@ -2737,24 +2739,24 @@
                 }
             }
 
-            // remove the deleted files delta
+            //remove the deleted files delta
             for (ExternalFile file : deletedFiles) {
                 MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
             }
 
-            // insert new files
+            //insert new files
             for (ExternalFile file : addedFiles) {
                 MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
                 file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
                 MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
             }
 
-            // mark the transaction as complete
+            //mark the transaction as complete
             ((ExternalDatasetDetails) transactionDataset.getDatasetDetails())
                     .setState(ExternalDatasetTransactionState.COMMIT);
             MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset);
 
-            // commit metadata transaction
+            //commit metadata transaction
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             success = true;
         } catch (Exception e) {
@@ -2766,12 +2768,12 @@
                         + datasetName + ") refresh couldn't carry out the commit phase", e);
             }
             if (transactionState == ExternalDatasetTransactionState.COMMIT) {
-                // Nothing to do , everything should be clean
+                //Nothing to do , everything should be clean
                 throw e;
             }
             if (transactionState == ExternalDatasetTransactionState.BEGIN) {
-                // transaction failed, need to do the following
-                // clean NCs removing transaction components
+                //transaction failed, need to do the following
+                //clean NCs removing transaction components
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2781,12 +2783,12 @@
                 try {
                     JobUtils.runJob(hcc, spec, true);
                 } catch (Exception e2) {
-                    // This should never happen -- fix throw illegal
+                    //This should never happen -- fix throw illegal
                     e.addSuppressed(e2);
                     throw new IllegalStateException("System is in inconsistent state. Failed to abort refresh", e);
                 }
-                // remove the delta of files
-                // return the state of the dataset to committed
+                //remove the delta of files
+                //return the state of the dataset to committed
                 try {
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     for (ExternalFile file : deletedFiles) {
@@ -2799,7 +2801,7 @@
                         MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
                     }
                     MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
-                    // commit metadata transaction
+                    //commit metadata transaction
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     abort(e, e2, mdTxnCtx);
@@ -2852,19 +2854,20 @@
                     datasetNameFrom, datasetNameTo, mdTxnCtx);
 
             String pregelixHomeKey = "PREGELIX_HOME";
-            // Finds PREGELIX_HOME in system environment variables.
+            //Finds PREGELIX_HOME in system environment variables.
             String pregelixHome = System.getenv(pregelixHomeKey);
-            // Finds PREGELIX_HOME in Java properties.
+            //Finds PREGELIX_HOME in Java properties.
             if (pregelixHome == null) {
                 pregelixHome = System.getProperty(pregelixHomeKey);
             }
-            // Finds PREGELIX_HOME in AsterixDB configuration.
+            //Finds PREGELIX_HOME in AsterixDB configuration.
             if (pregelixHome == null) {
-                // Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties, pregelixHome can never be null.
+                //Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties,
+                //pregelixHome can never be null.
                 pregelixHome = AsterixAppContextInfo.getInstance().getCompilerProperties().getPregelixHome();
             }
 
-            // Constructs the pregelix command line.
+            //Constructs the pregelix command line.
             List<String> cmd = constructPregelixCommand(pregelixStmt, dataverseNameFrom, datasetNameFrom,
                     dataverseNameTo, datasetNameTo);
             ProcessBuilder pb = new ProcessBuilder(cmd);
@@ -2873,9 +2876,9 @@
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            // Executes the Pregelix command.
+            //Executes the Pregelix command.
             int resultState = executeExternalShellProgram(pb);
-            // Checks the return state of the external Pregelix command.
+            //Checks the return state of the external Pregelix command.
             if (resultState != 0) {
                 throw new AlgebricksException(
                         "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restarted. "
@@ -2893,12 +2896,12 @@
         }
     }
 
-    // Prepares to run a program on external runtime.
+    //Prepares to run a program on external runtime.
     private void prepareRunExternalRuntime(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
             RunStatement pregelixStmt, String dataverseNameFrom, String dataverseNameTo, String datasetNameFrom,
             String datasetNameTo, MetadataTransactionContext mdTxnCtx)
                     throws AlgebricksException, AsterixException, Exception {
-        // Validates the source/sink dataverses and datasets.
+        //Validates the source/sink dataverses and datasets.
         Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom, datasetNameFrom);
         if (fromDataset == null) {
             throw new AsterixException("The source dataset " + datasetNameFrom + " in dataverse " + dataverseNameFrom
@@ -2911,7 +2914,7 @@
         }
 
         try {
-            // Find the primary index of the sink dataset.
+            //Find the primary index of the sink dataset.
             Index toIndex = null;
             List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo,
                     pregelixStmt.getDatasetNameTo().getValue());
@@ -2924,7 +2927,7 @@
             if (toIndex == null) {
                 throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo);
             }
-            // Cleans up the sink dataset -- Drop and then Create.
+            //Cleans up the sink dataset -- Drop and then Create.
             DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo), pregelixStmt.getDatasetNameTo(),
                     true);
             this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
@@ -2941,12 +2944,12 @@
             throw new AlgebricksException("Error cleaning the result dataset. This should not happen.");
         }
 
-        // Flushes source dataset.
+        //Flushes source dataset.
         FlushDatasetUtils.flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseNameFrom, datasetNameFrom,
                 datasetNameFrom);
     }
 
-    // Executes external shell commands.
+    //Executes external shell commands.
     private int executeExternalShellProgram(ProcessBuilder pb)
             throws IOException, AlgebricksException, InterruptedException {
         Process process = pb.start();
@@ -2972,15 +2975,15 @@
             }
             process.waitFor();
         }
-        // Gets the exit value of the program.
+        //Gets the exit value of the program.
         int resultState = process.exitValue();
         return resultState;
     }
 
-    // Constructs a Pregelix command line.
+    //Constructs a Pregelix command line.
     private List<String> constructPregelixCommand(RunStatement pregelixStmt, String fromDataverseName,
             String fromDatasetName, String toDataverseName, String toDatasetName) {
-        // Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset.
+        //Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset.
         AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
         AsterixClusterProperties clusterProperties = AsterixClusterProperties.INSTANCE;
         String clientIP = clusterProperties.getCluster().getMasterNode().getClientIp();
@@ -2995,7 +2998,7 @@
         asterixdbParameterBuilder.append("pregelix.asterixdb.output.dataset=" + toDatasetName + ",");
         asterixdbParameterBuilder.append("pregelix.asterixdb.output.cleanup=false,");
 
-        // construct command
+        //construct command
         List<String> cmds = new ArrayList<String>();
         cmds.add("bin/pregelix");
         cmds.add(pregelixStmt.getParameters().get(0)); // jar
@@ -3008,7 +3011,7 @@
         String outputConverterClassValue = "=org.apache.pregelix.example.converter.VLongIdOutputVertexConverter,";
         boolean custPropAdded = false;
         boolean meetCustProp = false;
-        // User parameters.
+        //User parameters.
         for (String s : pregelixStmt.getParameters().get(2).split(" ")) {
             if (meetCustProp) {
                 if (!s.contains(inputConverterClassKey)) {
@@ -3030,10 +3033,10 @@
 
         if (!custPropAdded) {
             cmds.add(customizedPregelixProperty);
-            // Appends default converter classes to asterixdbParameterBuilder.
+            //Appends default converter classes to asterixdbParameterBuilder.
             asterixdbParameterBuilder.append(inputConverterClassKey + inputConverterClassValue);
             asterixdbParameterBuilder.append(outputConverterClassKey + outputConverterClassValue);
-            // Remove the last comma.
+            //Remove the last comma.
             asterixdbParameterBuilder.delete(asterixdbParameterBuilder.length() - 1,
                     asterixdbParameterBuilder.length());
             cmds.add(asterixdbParameterBuilder.toString());
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
index 976ca70..8d020e7 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
@@ -27,7 +27,6 @@
 import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.test.aql.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
-import org.apache.asterix.testframework.xml.TestGroup;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql
new file mode 100644
index 0000000..70322cb
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Drop a dataverse with disconnected feed
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use dataverse experiments;
+
+create type TwitterUserType as closed {
+    screen-name: string,
+    lang: string,
+    friends_count: int32,
+    statuses_count: int32,
+    name: string,
+    followers_count: int32
+}
+
+create type TweetMessageType as closed {
+    tweetid: int64,
+    user: TwitterUserType,
+    sender-location: point,
+    send-time: datetime,
+    referred-topics: {{ string }},
+    message-text: string
+}
+
+create dataset Tweets(TweetMessageType) primary key tweetid;
+
+create feed TweetFeed using socket_adapter
+(
+    ("sockets"="127.0.0.1:10001"),
+    ("address-type"="IP"),
+    ("type-name"="TweetMessageType"),
+    ("format"="adm")
+);
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
new file mode 100644
index 0000000..4d2f9c4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Drop a dataverse with disconnected feed
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+
+use dataverse experiments;
+set wait-for-completion-feed "false";
+
+connect feed TweetFeed to dataset Tweets;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
new file mode 100644
index 0000000..e70df33
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Drop a dataverse with disconnected feed
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+3000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
new file mode 100644
index 0000000..34d6285
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Drop a dataverse with disconnected feed
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+
+use dataverse experiments;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
new file mode 100644
index 0000000..5684b1c
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Drop a dataverse with disconnected feed
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+
+use dataverse experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.1.ddl.aql
new file mode 100644
index 0000000..547085f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.1.ddl.aql
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use dataverse experiments;
+
+create type TwitterUserType as closed {
+    screen-name: string,
+    lang: string,
+    friends_count: int32,
+    statuses_count: int32,
+    name: string,
+    followers_count: int32
+}
+
+create type TweetMessageType as closed {
+    tweetid: string,
+    tweetid-copy:string,
+    user: TwitterUserType,
+    sender-location: point,
+    send-time: datetime,
+    send-time-copy:datetime,
+    referred-topics: {{ string }},
+    message-text: string
+}
+
+create dataset Tweets(TweetMessageType) primary key tweetid;
+
+create feed TweetFeed using socket_adapter
+(
+    ("sockets"="127.0.0.1:10001"),
+    ("address-type"="IP"),
+    ("type-name"="TweetMessageType"),
+    ("format"="adm")
+);
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.2.update.aql
new file mode 100644
index 0000000..3d7fdbf
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.2.update.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+
+use dataverse experiments;
+set wait-for-completion-feed "false";
+
+connect feed TweetFeed to dataset Tweets;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql
new file mode 100644
index 0000000..eb18795
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+3000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql
new file mode 100644
index 0000000..578d458
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+start client 10001 file-client localhost data/twitter/tw_messages.adm 500 50 1000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql
new file mode 100644
index 0000000..18bbbbc
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+10000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql
new file mode 100644
index 0000000..0862bae
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+
+use dataverse experiments;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql
new file mode 100644
index 0000000..fd8926b
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+
+use dataverse experiments;
+
+for $x in dataset Tweets
+order by $x.tweetid
+return $x;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql
new file mode 100644
index 0000000..6753868
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+
+stop 10001
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql
new file mode 100644
index 0000000..1295b97
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+
+use dataverse experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm
new file mode 100644
index 0000000..7047dbc
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm
@@ -0,0 +1,10 @@
+{ "tweetid": "1", "tweetid-copy": "1", "user": { "screen-name": "RollandEckhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardstein", "followers_count": 3311368i32 }, "sender-location": point("42.13,80.43"), "send-time": datetime("2005-12-05T21:06:41.000Z"), "send-time-copy": datetime("2005-12-05T21:06:41.000Z"), "referred-topics": {{ "samsung", "plan" }}, "message-text": " love samsung the plan is amazing" }
+{ "tweetid": "10", "tweetid-copy": "10", "user": { "screen-name": "Rolldstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardstful", "followers_count": 3311368i32 }, "sender-location": point("46.94,93.98"), "send-time": datetime("2011-04-07T14:08:46.000Z"), "send-time-copy": datetime("2011-04-07T14:08:46.000Z"), "referred-topics": {{ "t-mobile", "signal" }}, "message-text": " like t-mobile the signal is good" }
+{ "tweetid": "2", "tweetid-copy": "2", "user": { "screen-name": "RollandEckhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "David Eckhardstein", "followers_count": 3311368i32 }, "sender-location": point("28.86,70.44"), "send-time": datetime("2007-08-15T06:44:17.000Z"), "send-time-copy": datetime("2007-08-15T06:44:17.000Z"), "referred-topics": {{ "sprint", "voice-clarity" }}, "message-text": " like sprint its voice-clarity is mind-blowing" }
+{ "tweetid": "3", "tweetid-copy": "3", "user": { "screen-name": "RollandEckhard#500", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Hetfield", "followers_count": 3311368i32 }, "sender-location": point("39.84,86.48"), "send-time": datetime("2008-12-24T00:07:04.000Z"), "send-time-copy": datetime("2008-12-24T00:07:04.000Z"), "referred-topics": {{ "verizon", "voice-command" }}, "message-text": " can't stand verizon its voice-command is terrible:(" }
+{ "tweetid": "4", "tweetid-copy": "4", "user": { "screen-name": "RollandEckhardstein#221", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardstinz", "followers_count": 3311368i32 }, "sender-location": point("27.67,87.32"), "send-time": datetime("2007-02-05T16:39:13.000Z"), "send-time-copy": datetime("2007-02-05T16:39:13.000Z"), "referred-topics": {{ "t-mobile", "customer-service" }}, "message-text": " love t-mobile its customer-service is mind-blowing" }
+{ "tweetid": "5", "tweetid-copy": "5", "user": { "screen-name": "RollandEcstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardst", "followers_count": 3311368i32 }, "sender-location": point("27.3,92.77"), "send-time": datetime("2010-09-12T06:15:28.000Z"), "send-time-copy": datetime("2010-09-12T06:15:28.000Z"), "referred-topics": {{ "t-mobile", "customization" }}, "message-text": " like t-mobile the customization is amazing:)" }
+{ "tweetid": "6", "tweetid-copy": "6", "user": { "screen-name": "Rollkhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Kirk Hammette ", "followers_count": 3311368i32 }, "sender-location": point("45.62,84.78"), "send-time": datetime("2012-01-23T06:23:13.000Z"), "send-time-copy": datetime("2012-01-23T06:23:13.000Z"), "referred-topics": {{ "iphone", "network" }}, "message-text": " like iphone its network is awesome:)" }
+{ "tweetid": "7", "tweetid-copy": "7", "user": { "screen-name": "andEckhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland khardstein", "followers_count": 3311368i32 }, "sender-location": point("44.12,81.46"), "send-time": datetime("2012-02-17T17:30:26.000Z"), "send-time-copy": datetime("2012-02-17T17:30:26.000Z"), "referred-topics": {{ "t-mobile", "network" }}, "message-text": " hate t-mobile the network is bad" }
+{ "tweetid": "8", "tweetid-copy": "8", "user": { "screen-name": "Rolltein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Ron Eckhardstein", "followers_count": 3311368i32 }, "sender-location": point("36.86,90.71"), "send-time": datetime("2009-03-12T13:18:04.000Z"), "send-time-copy": datetime("2009-03-12T13:18:04.000Z"), "referred-topics": {{ "at&t", "touch-screen" }}, "message-text": " dislike at&t its touch-screen is OMG" }
+{ "tweetid": "9", "tweetid-copy": "9", "user": { "screen-name": "Roldstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckdstein", "followers_count": 3311368i32 }, "sender-location": point("29.07,97.05"), "send-time": datetime("2012-08-15T20:19:46.000Z"), "send-time-copy": datetime("2012-08-15T20:19:46.000Z"), "referred-topics": {{ "verizon", "speed" }}, "message-text": " hate verizon its speed is bad" }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index a3a1fba..96a2c37 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -36,6 +36,16 @@
           </compilation-unit>
         </test-case> -->
         <test-case FilePath="feeds">
+            <compilation-unit name="feed-push-socket">
+                <output-dir compare="Text">feed-push-socket</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="drop-dataverse-with-disconnected-feed">
+                <output-dir compare="Text">drop-dataverse-with-disconnected-feed</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
             <compilation-unit name="feed-with-external-parser">
                 <output-dir compare="Text">feed-with-external-parser</output-dir>
             </compilation-unit>
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java b/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
new file mode 100644
index 0000000..765dc71
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class FileFeedSocketAdapterClient implements ITestClient {
+    private final int port;
+    private final int wait;
+    private final String url;
+    private Socket socket;
+    private String path;
+    private int batchSize;
+    private int maxCount;
+    private OutputStream out = null;
+
+    // expected args: url, source-file-path, max-count, batch-size, wait
+    public FileFeedSocketAdapterClient(int port, String[] args) throws Exception {
+        this.port = port;
+        if (args.length != 5) {
+            throw new Exception(
+                    "Invalid arguments for FileFeedSocketAdapterClient. Expected arguments <url> <source-file-path> <max-count> <batch-size> <wait>");
+        }
+        this.url = args[0];
+        this.path = args[1];
+        this.maxCount = Integer.parseInt(args[2]);
+        this.batchSize = Integer.parseInt(args[3]);
+        this.wait = Integer.parseInt(args[4]);
+    }
+
+    @Override
+    public void start() {
+        try {
+            socket = new Socket(url, port);
+        } catch (IOException e) {
+            System.err.println("Problem in creating socket against host " + url + " on the port " + port);
+            e.printStackTrace();
+        }
+
+        int recordCount = 0;
+        BufferedReader br = null;
+        try {
+            out = socket.getOutputStream();
+            br = new BufferedReader(new FileReader(path));
+            String nextRecord;
+            while ((nextRecord = br.readLine()) != null) {
+                ByteBuffer b = StandardCharsets.UTF_8.encode(nextRecord);
+                if (wait >= 1 && recordCount % batchSize == 0) {
+                    Thread.sleep(wait);
+                }
+                out.write(b.array(), 0, b.limit());
+                recordCount++;
+                if (recordCount == maxCount) {
+                    break;
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (br != null) {
+                try {
+                    br.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (out != null) {
+            try {
+                out.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        try {
+            if (socket != null) {
+                socket.close();
+            }
+        } catch (IOException e) {
+            System.err.println("Problem in closing socket against host " + url + " on the port " + port);
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java b/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
new file mode 100644
index 0000000..56d626d
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+public interface ITestClient {
+
+    public void start() throws Exception;
+
+    public void stop() throws Exception;
+
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java b/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
new file mode 100644
index 0000000..d26351b
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+import java.util.Arrays;
+
+public class TestClientProvider {
+
+    public static ITestClient createTestClient(String[] args, int port) throws Exception {
+        if (args.length < 1) {
+            throw new Exception("Unspecified test client");
+        }
+        String clientName = args[0];
+        String[] clientArgs = Arrays.copyOfRange(args, 1, args.length);
+        switch (clientName) {
+            case "file-client":
+                return new FileFeedSocketAdapterClient(port, clientArgs);
+            default:
+                throw new Exception("Unknown test client: " + clientName);
+        }
+    }
+
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
index f40cce4..ba32af2 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
@@ -26,10 +26,10 @@
 import java.net.Socket;
 
 public class FileTestServer implements ITestServer {
-    private String[] paths;
-    private final int port;
-    private ServerSocket serverSocket;
-    private Thread listenerThread;
+    protected String[] paths;
+    protected final int port;
+    protected ServerSocket serverSocket;
+    protected Thread listenerThread;
 
     public FileTestServer(int port) {
         this.port = port;
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
index 18a4969..b3b1183 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
@@ -20,7 +20,7 @@
 
 public interface ITestServer {
 
-    public void configure(String[] args);
+    public void configure(String[] args) throws Exception;
 
     public void start() throws Exception;
 
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
new file mode 100644
index 0000000..1c2cef6
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class OpenSocketFileTestServer extends FileTestServer {
+
+    private boolean closed;
+
+    public OpenSocketFileTestServer(int port) {
+        super(port);
+    }
+
+    @Override
+    public void start() throws IOException {
+        serverSocket = new ServerSocket(port);
+        listenerThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (!serverSocket.isClosed()) {
+                    try {
+                        Socket socket = serverSocket.accept();
+                        new Thread(new SocketThread(socket)).start();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        // Do nothing. This means the socket was closed for some reason.
+                        // There is nothing to do here except try to close the socket and see if the
+                        // server is still listening!
+                        // This also could be due to the close() call
+                    }
+                }
+            }
+        });
+        listenerThread.start();
+    }
+
+    private class SocketThread implements Runnable {
+        private Socket socket;
+        private OutputStream os;
+
+        public SocketThread(Socket socket) {
+            this.socket = socket;
+        }
+
+        @Override
+        public void run() {
+            try {
+                os = socket.getOutputStream();
+                byte[] chunk = new byte[1024];
+                for (String path : paths) {
+                    try (FileInputStream fin = new FileInputStream(new File(path))) {
+                        int read = fin.read(chunk);
+                        while (read > 0) {
+                            os.write(chunk, 0, read);
+                            read = fin.read(chunk);
+                        }
+                    }
+                }
+            } catch (Throwable th) {
+                th.printStackTrace();
+                // There are two possibilities here:
+                // 1. The socket was closed from the other end.
+                // 2. Server.close() was called.
+            } finally {
+                synchronized (serverSocket) {
+                    if (!closed) {
+                        try {
+                            serverSocket.wait();
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+                try {
+                    os.close();
+                } catch (Throwable th) {
+                    th.printStackTrace();
+                }
+                try {
+                    socket.close();
+                } catch (Throwable th) {
+                    th.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void stop() throws IOException, InterruptedException {
+        synchronized (serverSocket) {
+            closed = true;
+            try {
+                serverSocket.close();
+                if (listenerThread.isAlive()) {
+                    listenerThread.join();
+                }
+            } finally {
+                serverSocket.notifyAll();
+            }
+        }
+    }
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
new file mode 100644
index 0000000..3312d1b
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.server;
+
+import org.apache.asterix.test.client.ITestClient;
+import org.apache.asterix.test.client.TestClientProvider;
+
+public class TestClientServer implements ITestServer {
+
+    // port of the server to connect to
+    private final int port;
+    private ITestClient client;
+
+    public TestClientServer(int port) {
+        this.port = port;
+    }
+
+    @Override
+    public void configure(String[] args) throws Exception {
+        client = TestClientProvider.createTestClient(args, port);
+    }
+
+    @Override
+    public void start() throws Exception {
+        client.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        client.stop();
+    }
+
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java b/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
index 60c1c11..0bdb74e 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
@@ -26,6 +26,10 @@
                 return new FileTestServer(port);
             case "rss":
                 return new RSSTestServer(port);
+            case "open-socket-file":
+                return new OpenSocketFileTestServer(port);
+            case "client":
+                return new TestClientServer(port);
             default:
                 throw new Exception("Unknown test server");
         }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index d5b1c6e..851acd4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -38,7 +38,7 @@
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 
@@ -66,7 +66,7 @@
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
         return dataSourceFactory.getPartitionConstraint();
     }
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
index 17916e5..3965e5e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
@@ -22,7 +22,7 @@
 import java.util.Map;
 
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 /**
@@ -50,7 +50,7 @@
      * In the former case, the IP address is translated to a node controller id
      * running on the node with the given IP address.
      */
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception;
 
     /**
      * Creates an instance of IDatasourceAdapter.
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 370ea93..1487cf1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -19,9 +19,12 @@
 package org.apache.asterix.external.api;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Map;
 
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 
 public interface IExternalDataSourceFactory extends Serializable {
 
@@ -45,7 +48,7 @@
      * @return
      * @throws Exception
      */
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception;
 
     /**
      * Configure the data parser factory. The passed map contains key value pairs from the
@@ -63,4 +66,32 @@
         return false;
     }
 
+    public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(
+            AlgebricksAbsolutePartitionConstraint constraints, int count) {
+        if (constraints == null) {
+            ArrayList<String> locs = new ArrayList<String>();
+            Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
+            int i = 0;
+            while (i < count) {
+                for (String node : stores.keySet()) {
+                    int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node);
+                    for (int k = 0; k < numIODevices; k++) {
+                        locs.add(node);
+                        i++;
+                        if (i == count) {
+                            break;
+                        }
+                    }
+                    if (i == count) {
+                        break;
+                    }
+                }
+            }
+            String[] cluster = new String[locs.size()];
+            cluster = locs.toArray(cluster);
+            constraints = new AlgebricksAbsolutePartitionConstraint(cluster);
+        }
+        return constraints;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
index adb2602..fdc54d6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
@@ -25,5 +25,4 @@
     public IRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception;
 
     public Class<? extends T> getRecordClass();
-
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 5b3828d..6e3ead2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -40,7 +40,6 @@
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.hdfs.dataflow.ConfFactory;
@@ -51,7 +50,7 @@
         implements IInputStreamProviderFactory, IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
 
     protected static final long serialVersionUID = 1L;
-    protected transient AlgebricksPartitionConstraint clusterLocations;
+    protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
     protected String[] readSchedule;
     protected boolean read[];
     protected InputSplitsFactory inputSplitsFactory;
@@ -76,7 +75,7 @@
         JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
         confFactory = new ConfFactory(conf);
         clusterLocations = getPartitionConstraint();
-        int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+        int numPartitions = clusterLocations.getLocations().length;
         // if files list was set, we restrict the splits to the list
         InputSplit[] inputSplits;
         if (files == null) {
@@ -99,7 +98,8 @@
         }
     }
 
-    // Used to tell the factory to restrict the splits to the intersection between this list and the actual files on hdfs side
+    // Used to tell the factory to restrict the splits to the intersection between this list and the
+    // actual files on hdfs side
     @Override
     public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
         this.files = files;
@@ -108,7 +108,8 @@
 
     /*
      * The method below was modified to take care of the following
-     * 1. when target files are not null, it generates a file aware input stream that validate against the files
+     * 1. when target files are not null, it generates a file aware input stream that validate
+     * against the files
      * 2. if the data is binary, it returns a generic reader
      */
     @Override
@@ -135,7 +136,7 @@
      * @return
      */
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
         clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
         return clusterLocations;
     }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
index b9b6f65..b715a26 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
@@ -29,7 +29,7 @@
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 import com.couchbase.client.core.CouchbaseCore;
@@ -71,7 +71,7 @@
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
         return AsterixClusterProperties.INSTANCE.getClusterLocations();
     }
 
@@ -100,7 +100,8 @@
     }
 
     /*
-     * We distribute the work of streaming vbuckets between all the partitions in a round robin fashion.
+     * We distribute the work of streaming vbuckets between all the partitions in a round robin
+     * fashion.
      */
     private void schedule() {
         schedule = new int[numOfVBuckets];
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
index c302b9b..22488f7 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
@@ -28,14 +28,14 @@
 import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 
 public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
 
     protected static final long serialVersionUID = 1L;
-    protected transient AlgebricksPartitionConstraint clusterLocations;
+    protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
     protected ConfFactory confFactory;
     protected Map<String, String> configuration;
 
@@ -48,7 +48,7 @@
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
         clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
         return clusterLocations;
     }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index bbe485c..beceea8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -22,11 +22,11 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 import com.sun.syndication.feed.synd.SyndEntryImpl;
@@ -36,6 +36,7 @@
     private static final long serialVersionUID = 1L;
     private Map<String, String> configuration;
     private List<String> urls = new ArrayList<String>();
+    private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
 
     @Override
     public DataSourceType getDataSourceType() {
@@ -43,8 +44,10 @@
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(urls.size());
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+        int count = urls.size();
+        clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, count);
+        return clusterLocations;
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
index f02bd93..d02de03 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
@@ -29,7 +29,7 @@
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public abstract class AbstractStreamRecordReaderFactory<T>
@@ -51,7 +51,7 @@
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
         return inputStreamFactory.getPartitionConstraint();
     }
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 9b2d095..f41486e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -77,7 +77,7 @@
         inString = false;
         depth = 0;
         do {
-            int startPosn = bufferPosn; //starting from where we left off the last time
+            int startPosn = bufferPosn; // starting from where we left off the last time
             if (bufferPosn >= bufferLength) {
                 startPosn = bufferPosn = 0;
                 bufferLength = reader.read(inputBuffer);
@@ -87,7 +87,7 @@
                 }
             }
             if (!hasStarted) {
-                for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+                for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin
                     if (inputBuffer[bufferPosn] == recordStart) {
                         startPosn = bufferPosn;
                         hasStarted = true;
@@ -108,7 +108,7 @@
                 }
             }
             if (hasStarted) {
-                for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+                for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin
                     if (inString) {
                         // we are in a string, we only care about the string end
                         if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index f38c2cb..a2a4742 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -23,6 +23,7 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -30,8 +31,7 @@
 import org.apache.asterix.external.util.TwitterUtil;
 import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
 import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 import twitter4j.Status;
@@ -46,6 +46,7 @@
 
     private Map<String, String> configuration;
     private boolean pull;
+    private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
 
     @Override
     public DataSourceType getDataSourceType() {
@@ -53,8 +54,9 @@
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(INTAKE_CARDINALITY);
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+        clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, INTAKE_CARDINALITY);
+        return clusterLocations;
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
index e780c95..89008aa 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -20,16 +20,28 @@
 
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
 
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 
 public class AInputStreamReader extends InputStreamReader {
     private AInputStream in;
+    private byte[] bytes = new byte[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
+    private ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    private CharBuffer charBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+    private CharsetDecoder decoder;
+    private boolean done = false;
 
     public AInputStreamReader(AInputStream in) {
         super(in);
         this.in = in;
+        this.decoder = StandardCharsets.UTF_8.newDecoder();
+        this.byteBuffer.flip();
     }
 
     public boolean skipError() throws Exception {
@@ -51,4 +63,33 @@
     public void setFeedLogManager(FeedLogManager feedLogManager) {
         in.setFeedLogManager(feedLogManager);
     }
+
+    @Override
+    public int read(char cbuf[]) throws IOException {
+        return read(cbuf, 0, cbuf.length);
+    }
+
+    @Override
+    public int read(char cbuf[], int offset, int length) throws IOException {
+        if (done) {
+            return -1;
+        }
+        charBuffer.clear();
+        if (byteBuffer.hasRemaining()) {
+            decoder.decode(byteBuffer, charBuffer, false);
+            System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
+            return charBuffer.position();
+        }
+        int len = in.read(bytes, 0, bytes.length);
+        if (len == -1) {
+            done = true;
+            return len;
+        }
+        byteBuffer.clear();
+        byteBuffer.position(len);
+        byteBuffer.flip();
+        decoder.decode(byteBuffer, charBuffer, false);
+        System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
+        return charBuffer.position();
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
index 1e86f39..cf8d339 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
@@ -25,7 +25,9 @@
 import java.util.Map;
 
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class SocketInputStream extends AInputStream {
     private ServerSocket server;
@@ -34,8 +36,13 @@
 
     public SocketInputStream(ServerSocket server) throws IOException {
         this.server = server;
-        socket = server.accept();
-        connectionStream = socket.getInputStream();
+        socket = new Socket();
+        connectionStream = new InputStream() {
+            @Override
+            public int read() throws IOException {
+                return -1;
+            }
+        };
     }
 
     @Override
@@ -56,20 +63,31 @@
 
     @Override
     public int read(byte b[]) throws IOException {
-        int read = connectionStream.read(b, 0, b.length);
-        while (read < 0) {
-            accept();
-            read = connectionStream.read(b, 0, b.length);
-        }
-        return read;
+        return read(b, 0, b.length);
     }
 
     @Override
     public int read(byte b[], int off, int len) throws IOException {
-        int read = connectionStream.read(b, off, len);
-        while (read < 0) {
-            accept();
+        if (server == null) {
+            return -1;
+        }
+        int read = -1;
+        try {
             read = connectionStream.read(b, off, len);
+        } catch (IOException e) {
+            e.printStackTrace();
+            read = -1;
+        }
+        while (read < 0) {
+            if (!accept()) {
+                return -1;
+            }
+            try {
+                read = connectionStream.read(b, off, len);
+            } catch (IOException e) {
+                e.printStackTrace();
+                read = -1;
+            }
         }
         return read;
     }
@@ -85,22 +103,57 @@
     }
 
     @Override
-    public void close() throws IOException {
-        connectionStream.close();
-        socket.close();
-        server.close();
+    public synchronized void close() throws IOException {
+        HyracksDataException hde = null;
+        try {
+            if (connectionStream != null) {
+                connectionStream.close();
+            }
+            connectionStream = null;
+        } catch (IOException e) {
+            hde = new HyracksDataException(e);
+        }
+        try {
+            if (socket != null) {
+                socket.close();
+            }
+            socket = null;
+        } catch (IOException e) {
+            hde = ExternalDataExceptionUtils.suppress(hde, e);
+        }
+        try {
+            if (server != null) {
+                server.close();
+            }
+        } catch (IOException e) {
+            hde = ExternalDataExceptionUtils.suppress(hde, e);
+        } finally {
+            server = null;
+        }
+        if (hde != null) {
+            throw hde;
+        }
     }
 
-    private void accept() throws IOException {
-        connectionStream.close();
-        socket.close();
-        socket = server.accept();
-        connectionStream = socket.getInputStream();
+    private boolean accept() throws IOException {
+        try {
+            connectionStream.close();
+            connectionStream = null;
+            socket.close();
+            socket = null;
+            socket = server.accept();
+            connectionStream = socket.getInputStream();
+            return true;
+        } catch (Exception e) {
+            close();
+            return false;
+        }
     }
 
     @Override
     public boolean stop() throws Exception {
-        return false;
+        close();
+        return true;
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
index 3f70ce1..5c1583e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -34,7 +34,6 @@
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.NodeResolverFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
@@ -48,7 +47,8 @@
     protected static INodeResolver nodeResolver;
     protected Map<String, String> configuration;
     protected FileSplit[] inputFileSplits;
-    protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage
+    protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log
+                                             // storage
     protected boolean isFeed;
     protected String expression;
     // transient fields (They don't need to be serialized and transferred)
@@ -84,7 +84,7 @@
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
         return constraints;
     }
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
index ea60f43..6fdc42d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
@@ -35,7 +35,6 @@
 import org.apache.asterix.om.util.AsterixRuntimeUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
@@ -106,7 +105,7 @@
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
         List<String> locations = new ArrayList<String>();
         for (Pair<String, Integer> socket : sockets) {
             locations.add(socket.first);
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
index 484626a..95378cb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
@@ -27,7 +27,6 @@
 import org.apache.asterix.external.input.stream.provider.TwitterFirehoseInputStreamProvider;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 /**
@@ -54,7 +53,7 @@
     private Map<String, String> configuration;
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
         String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
         String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
         String[] locations = null;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
index e39b507..cd4a3c1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
@@ -58,7 +58,6 @@
 
     @Override
     public AInputStream getInputStream() throws Exception {
-        twitterServer.start();
         return twitterServer;
     }
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index 7e28c35..d0348c2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -149,7 +149,7 @@
     private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
         int waitCycleCount = 0;
         ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
-        while (ingestionRuntime == null && waitCycleCount < 10) {
+        while (ingestionRuntime == null && waitCycleCount < 1000) {
             try {
                 Thread.sleep(3000);
                 waitCycleCount++;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
index 3cb5d64..36c11e9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
@@ -241,7 +241,8 @@
         FeedRuntimeId runtimeId = null;
         FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType();
         if (endFeedMessage.isCompleteDisconnection()) {
-            // subscribableRuntimeType represents the location at which the feed connection receives data
+            // subscribableRuntimeType represents the location at which the feed connection receives
+            // data
             FeedRuntimeType runtimeType = null;
             switch (subscribableRuntimeType) {
                 case INTAKE:
@@ -257,15 +258,19 @@
             runtimeId = new FeedRuntimeId(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
             CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager()
                     .getFeedRuntime(connectionId, runtimeId);
-            feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+            if (feedRuntime != null) {
+                feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+            }
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
             }
         } else {
-            // subscribaleRuntimeType represents the location for data hand-off in presence of subscribers
+            // subscribaleRuntimeType represents the location for data hand-off in presence of
+            // subscribers
             switch (subscribableRuntimeType) {
                 case INTAKE:
-                    // illegal state as data hand-off from one feed to another does not happen at intake
+                    // illegal state as data hand-off from one feed to another does not happen at
+                    // intake
                     throw new IllegalStateException("Illegal State, invalid runtime type  " + subscribableRuntimeType);
                 case COMPUTE:
                     // feed could be primary or secondary, doesn't matter
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index c128545..50d8ac0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -58,13 +58,12 @@
 
     public static FileSplit[] splitsForAdapter(String dataverseName, String feedName,
             AlgebricksPartitionConstraint partitionConstraints) throws Exception {
-        File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
-        String[] locations = null;
         if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) {
             throw new AlgebricksException("Can't create file splits for adapter with count partitioning constraints");
-        } else {
-            locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
         }
+        File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
+        String[] locations = null;
+        locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
         List<FileSplit> splits = new ArrayList<FileSplit>();
         String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
         int i = 0;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 7ac0428..9a72135 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -42,7 +42,6 @@
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.hdfs.scheduler.Scheduler;
@@ -199,8 +198,8 @@
         return conf;
     }
 
-    public static AlgebricksPartitionConstraint getPartitionConstraints(
-            AlgebricksPartitionConstraint clusterLocations) {
+    public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(
+            AlgebricksAbsolutePartitionConstraint clusterLocations) {
         if (clusterLocations == null) {
             ArrayList<String> locs = new ArrayList<String>();
             Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index e34a09b..6b11d21 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -23,14 +23,14 @@
 
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.asterix.external.parser.ADMDataParser;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -48,14 +48,17 @@
 
     private Map<String, String> configuration;
 
+    private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+
     @Override
     public String getAlias() {
         return "test_typed";
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(1);
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+        clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, 1);
+        return clusterLocations;
     }
 
     @Override

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/660
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I8f6e982440d3577343f2479c3779653a9c3db614
Gerrit-PatchSet: 11
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>
Gerrit-Reviewer: Ildar Absalyamov <ildar.absalyamov@gmail.com>
Gerrit-Reviewer: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Till Westmann <tillw@apache.org>
Gerrit-Reviewer: abdullah alamoudi <bamousaa@gmail.com>


Mime
View raw message