asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: ASTERIXDB-1302 Fix Deadlock with Feed Connection
Date Wed, 24 Feb 2016 15:23:53 GMT
abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/660

Change subject: ASTERIXDB-1302 Fix Deadlock with Feed Connection
......................................................................

ASTERIXDB-1302 Fix Deadlock with Feed Connection

A bug causes a read lock to never be released when a feed is
connected with "wait-for-completion" set to false. The bug
was fixed and a test case was added.

Change-Id: I8f6e982440d3577343f2479c3779653a9c3db614
---
M asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
M asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
M asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
M asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
M asterix-app/src/test/resources/runtimets/testsuite.xml
A asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
A asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
A asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
M asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
M asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
A asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
A asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
M asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
M asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
M asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
23 files changed, 711 insertions(+), 114 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/60/660/1

diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index e33aed2..be9452b 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -87,7 +87,8 @@
             if (tempPath.endsWith(File.separator)) {
                 tempPath = tempPath.substring(0, tempPath.length() - 1);
             }
-            //get initial partitions from properties
+            System.err.println("Using the path: " + tempPath);
+            // get initial partitions from properties
             String[] nodeStores = propertiesAccessor.getStores().get(ncName);
             if (nodeStores == null) {
                 throw new Exception("Coudn't find stores for NC: " + ncName);
@@ -97,7 +98,7 @@
                 tempDirPath += File.separator;
             }
             for (int p = 0; p < nodeStores.length; p++) {
-                //create IO devices based on stores
+                // create IO devices based on stores
                 String iodevicePath = tempDirPath + ncConfig1.nodeId + File.separator + nodeStores[p];
                 File ioDeviceDir = new File(iodevicePath);
                 ioDeviceDir.mkdirs();
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index c0245d7..5cd490a 100644
--- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@ -22,6 +22,7 @@
 import java.util.List;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.feed.api.IFeedJoint;
 import org.apache.asterix.external.feed.api.IFeedMessage;
@@ -37,9 +38,11 @@
 import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
 import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor;
 import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.file.JobSpecificationUtils;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
@@ -49,6 +52,9 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 
 /**
@@ -251,4 +257,17 @@
                 completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED);
         return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations);
     }
+
+    public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws Exception {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        AlgebricksAbsolutePartitionConstraint locations = AsterixClusterProperties.INSTANCE.getClusterLocations();
+        FileSplit[] feedLogFileSplits = FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(),
+                locations);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = StoragePathUtil
+                .splitProviderAndPartitionConstraints(feedLogFileSplits);
+        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second);
+        spec.addRoot(frod);
+        return spec;
+    }
 }
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 9f024e9..90eb7b9 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -209,7 +209,7 @@
         ASYNC_DEFERRED
     }
 
-    public static final boolean IS_DEBUG_MODE = false;//true
+    public static final boolean IS_DEBUG_MODE = false;// true
     private final List<Statement> statements;
     private final SessionConfig sessionConfig;
     private Dataverse activeDefaultDataverse;
@@ -593,7 +593,8 @@
                     }
                     if (compactionPolicy == null) {
                         if (filterField != null) {
-                            // If the dataset has a filter and the user didn't specify a merge policy, then we will pick the
+                            // If the dataset has a filter and the user didn't specify a merge
+                            // policy, then we will pick the
                             // correlated-prefix as the default merge policy.
                             compactionPolicy = GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
                             compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
@@ -615,12 +616,12 @@
 
             }
 
-            //#. initialize DatasetIdFactory if it is not initialized.
+            // #. initialize DatasetIdFactory if it is not initialized.
             if (!DatasetIdFactory.isInitialized()) {
                 DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
             }
 
-            //#. add a new dataset with PendingAddOp
+            // #. add a new dataset with PendingAddOp
             dataset = new Dataset(dataverseName, datasetName, itemTypeDataverseName, itemTypeName, ngName,
                     compactionPolicy, compactionPolicyProperties, datasetDetails, dd.getHints(), dsType,
                     DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
@@ -632,21 +633,21 @@
                 JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
                         metadataProvider);
 
-                //#. make metadataTxn commit before calling runJob.
+                // #. make metadataTxn commit before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-                //#. runJob
+                // #. runJob
                 JobUtils.runJob(hcc, jobSpec, true);
 
-                //#. begin new metadataTxn
+                // #. begin new metadataTxn
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
             }
 
-            //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
+            // #. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
             MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
             dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP);
             MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
@@ -658,11 +659,12 @@
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
 
-                //#. execute compensation operations
-                //   remove the index in NC
-                //   [Notice]
-                //   As long as we updated(and committed) metadata, we should remove any effect of the job
-                //   because an exception occurs during runJob.
+                // #. execute compensation operations
+                // remove the index in NC
+                // [Notice]
+                // As long as we updated(and committed) metadata, we should remove any effect of the
+                // job
+                // because an exception occurs during runJob.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -679,7 +681,7 @@
                     }
                 }
 
-                //   remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -880,8 +882,10 @@
                 }
             }
 
-            // Checks whether a user is trying to create an inverted secondary index on a dataset with a variable-length primary key.
-            // Currently, we do not support this. Therefore, as a temporary solution, we print an error message and stop.
+            // Checks whether a user is trying to create an inverted secondary index on a dataset
+            // with a variable-length primary key.
+            // Currently, we do not support this. Therefore, as a temporary solution, we print an
+            // error message and stop.
             if (stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
                     || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
                     || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
@@ -959,7 +963,7 @@
                 }
             }
 
-            //check whether there exists another enforced index on the same field
+            // check whether there exists another enforced index on the same field
             if (stmtCreateIndex.isEnforced()) {
                 List<Index> indexes = MetadataManager.INSTANCE
                         .getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
@@ -973,7 +977,7 @@
                 }
             }
 
-            //#. add a new index with PendingAddOp
+            // #. add a new index with PendingAddOp
             Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields,
                     indexFieldTypes, stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(), false,
                     IMetadataEntity.PENDING_ADD_OP);
@@ -984,7 +988,7 @@
                 enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType, index);
             }
 
-            //#. prepare to create the index artifact in NC.
+            // #. prepare to create the index artifact in NC.
             CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
                     index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
                     index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
@@ -998,14 +1002,14 @@
 
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-            //#. create the index artifact in NC.
+            // #. create the index artifact in NC.
             JobUtils.runJob(hcc, spec, true);
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-            //#. load data into the index in NC.
+            // #. load data into the index in NC.
             cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
                     index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(),
                     index.getGramLength(), index.getIndexType());
@@ -1015,17 +1019,18 @@
 
             JobUtils.runJob(hcc, spec, true);
 
-            //#. begin new metadataTxn
+            // #. begin new metadataTxn
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-            //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
+            // #. add another new index with PendingNoOp after deleting the index with PendingAddOp
             MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
                     indexName);
             index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-            // add another new files index with PendingNoOp after deleting the index with PendingAddOp
+            // add another new files index with PendingNoOp after deleting the index with
+            // PendingAddOp
             if (firstExternalDatasetIndex) {
                 MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
                         filesIndex.getIndexName());
@@ -1041,7 +1046,8 @@
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
             }
-            // If files index was replicated for external dataset, it should be cleaned up on NC side
+            // If files index was replicated for external dataset, it should be cleaned up on NC
+            // side
             if (filesIndexReplicated) {
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
@@ -1062,8 +1068,8 @@
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //   remove the index in NC
+                // #. execute compensation operations
+                // remove the index in NC
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1183,7 +1189,6 @@
         MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName);
         List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
         try {
-
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
             if (dv == null) {
                 if (stmtDelete.getIfExists()) {
@@ -1194,7 +1199,7 @@
                 }
             }
 
-            //# disconnect all feeds from any datasets in the dataverse.
+            // # disconnect all feeds from any datasets in the dataverse.
             List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE
                     .getActiveFeedConnections(null);
             DisconnectFeedStatement disStmt = null;
@@ -1216,10 +1221,15 @@
                                     + connection.getDatasetName() + ". Encountered exception " + exception);
                         }
                     }
+                    // prepare job to remove feed log storage
+                    jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
+                            MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedId.getFeedName())));
                 }
             }
 
-            //#. prepare jobs which will drop corresponding datasets with indexes.
+            // #. prepare jobs which will drop corresponding datasets with indexes.
+
+            // #. prepare jobs which will drop corresponding datasets with indexes.
             List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
             for (int j = 0; j < datasets.size(); j++) {
                 String datasetName = datasets.get(j).getDatasetName();
@@ -1259,9 +1269,10 @@
                 }
             }
             jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
-            //#. mark PendingDropOp on the dataverse record by
-            //   first, deleting the dataverse record from the DATAVERSE_DATASET
-            //   second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
+            // #. mark PendingDropOp on the dataverse record by
+            // first, deleting the dataverse record from the DATAVERSE_DATASET
+            // second, inserting the dataverse record with the PendingDropOp value into the
+            // DATAVERSE_DATASET
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
             MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
                     new Dataverse(dataverseName, dv.getDataFormat(), IMetadataEntity.PENDING_DROP_OP));
@@ -1278,7 +1289,7 @@
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-            //#. finally, delete the dataverse.
+            // #. finally, delete the dataverse.
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
             if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
                 activeDefaultDataverse = null;
@@ -1294,18 +1305,18 @@
                     activeDefaultDataverse = null;
                 }
 
-                //#. execute compensation operations
-                //   remove the all indexes in NC
+                // #. execute compensation operations
+                // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to be compensated.
+                    // do no throw exception since still the metadata needs to be compensated.
                     e.addSuppressed(e2);
                 }
 
-                //   remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 try {
                     MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
@@ -1366,7 +1377,7 @@
                     }
                 }
 
-                //#. prepare jobs to drop the datatset and the indexes in NC
+                // #. prepare jobs to drop the datatset and the indexes in NC
                 List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                 for (int j = 0; j < indexes.size(); j++) {
                     if (indexes.get(j).isSecondaryIndex()) {
@@ -1378,7 +1389,7 @@
                 CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
                 jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
 
-                //#. mark the existing dataset as PendingDropOp
+                // #. mark the existing dataset as PendingDropOp
                 MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
                 MetadataManager.INSTANCE.addDataset(mdTxnCtx,
                         new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
@@ -1390,12 +1401,12 @@
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-                //# disconnect the feeds
+                // # disconnect the feeds
                 for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
                     JobUtils.runJob(hcc, p.first, true);
                 }
 
-                //#. run the jobs
+                // #. run the jobs
                 for (JobSpecification jobSpec : jobsToExecute) {
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
@@ -1406,7 +1417,7 @@
             } else {
                 // External dataset
                 ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
-                //#. prepare jobs to drop the datatset and the indexes in NC
+                // #. prepare jobs to drop the datatset and the indexes in NC
                 List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                 for (int j = 0; j < indexes.size(); j++) {
                     if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
@@ -1421,7 +1432,7 @@
                     }
                 }
 
-                //#. mark the existing dataset as PendingDropOp
+                // #. mark the existing dataset as PendingDropOp
                 MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
                 MetadataManager.INSTANCE.addDataset(mdTxnCtx,
                         new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
@@ -1433,7 +1444,7 @@
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-                //#. run the jobs
+                // #. run the jobs
                 for (JobSpecification jobSpec : jobsToExecute) {
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
@@ -1445,7 +1456,7 @@
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
             }
 
-            //#. finally, delete the dataset.
+            // #. finally, delete the dataset.
             MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
             // Drop the associated nodegroup
             String nodegroup = ds.getNodeGroupName();
@@ -1460,18 +1471,18 @@
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //   remove the all indexes in NC
+                // #. execute compensation operations
+                // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to be compensated.
+                    // do no throw exception since still the metadata needs to be compensated.
                     e.addSuppressed(e2);
                 }
 
-                //   remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1545,18 +1556,18 @@
                         throw new AlgebricksException("There is no index with this name " + indexName + ".");
                     }
                 }
-                //#. prepare a job to drop the index in NC.
+                // #. prepare a job to drop the index in NC.
                 CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
                 jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
 
-                //#. mark PendingDropOp on the existing index
+                // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
                         new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
                                 IMetadataEntity.PENDING_DROP_OP));
 
-                //#. commit the existing transaction before calling runJob.
+                // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
@@ -1565,12 +1576,12 @@
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
 
-                //#. begin a new transaction
+                // #. begin a new transaction
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-                //#. finally, delete the existing index
+                // #. finally, delete the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
             } else {
                 // External dataset
@@ -1586,7 +1597,7 @@
                 } else if (ExternalIndexingOperations.isFileIndex(index)) {
                     throw new AlgebricksException("Dropping a dataset's files index is not allowed.");
                 }
-                //#. prepare a job to drop the index in NC.
+                // #. prepare a job to drop the index in NC.
                 CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
                 jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
                 List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
@@ -1600,7 +1611,7 @@
                                     externalIndex.getIndexName());
                             jobsToExecute.add(
                                     ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
-                            //#. mark PendingDropOp on the existing files index
+                            // #. mark PendingDropOp on the existing files index
                             MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
                                     externalIndex.getIndexName());
                             MetadataManager.INSTANCE.addIndex(mdTxnCtx,
@@ -1612,14 +1623,14 @@
                     }
                 }
 
-                //#. mark PendingDropOp on the existing index
+                // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
                         new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
                                 IMetadataEntity.PENDING_DROP_OP));
 
-                //#. commit the existing transaction before calling runJob.
+                // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
@@ -1628,12 +1639,12 @@
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
 
-                //#. begin a new transaction
+                // #. begin a new transaction
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-                //#. finally, delete the existing index
+                // #. finally, delete the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 if (dropFilesIndex) {
                     // delete the files index too
@@ -1651,18 +1662,18 @@
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //   remove the all indexes in NC
+                // #. execute compensation operations
+                // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to be compensated.
+                    // do no throw exception since still the metadata needs to be compensated.
                     e.addSuppressed(e2);
                 }
 
-                //   remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1930,14 +1941,12 @@
 
     private void handleCreateFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
-
         CreateFeedStatement cfs = (CreateFeedStatement) stmt;
         String dataverseName = getActiveDataverse(cfs.getDataverseName());
         String feedName = cfs.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockManager.INSTANCE.createFeedBegin(dataverseName, dataverseName + "." + feedName);
-
         Feed feed = null;
         try {
             feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, feedName);
@@ -2120,7 +2129,6 @@
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        boolean readLatchAcquired = true;
         boolean subscriberRegistered = false;
         IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
         FeedConnectionId feedConnId = null;
@@ -2171,13 +2179,14 @@
                     FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);
                 }
                 JobUtils.runJob(hcc, pair.first, false);
-                /* TODO: Fix record tracking
+                /*
+                 * TODO: Fix record tracking
                  * IFeedAdapterFactory adapterFactory = pair.second;
-                if (adapterFactory.isRecordTrackingEnabled()) {
-                    FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId,
-                            adapterFactory.createIntakeProgressTracker());
-                }
-                */
+                 * if (adapterFactory.isRecordTrackingEnabled()) {
+                 * FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId,
+                 * adapterFactory.createIntakeProgressTracker());
+                 * }
+                 */
                 eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
             } else {
                 for (IFeedJoint fj : triple.third) {
@@ -2186,7 +2195,6 @@
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            readLatchAcquired = false;
             eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED);
             if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) {
                 eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_ENDED); // blocking call
@@ -2197,7 +2205,6 @@
             if (waitForCompletion) {
                 MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
                         dataverseName + "." + feedName);
-                readLatchAcquired = false;
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -2205,10 +2212,8 @@
             }
             throw e;
         } finally {
-            if (readLatchAcquired) {
-                MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
-                        dataverseName + "." + feedName);
-            }
+            MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
+                    dataverseName + "." + feedName);
             if (subscriberRegistered) {
                 FeedLifecycleListener.INSTANCE.deregisterFeedEventSubscriber(feedConnId, eventSubscriber);
             }
@@ -2242,7 +2247,8 @@
         boolean isFeedJointAvailable = FeedLifecycleListener.INSTANCE.isFeedJointAvailable(feedJointKey);
         if (!isFeedJointAvailable) {
             sourceFeedJoint = FeedLifecycleListener.INSTANCE.getAvailableFeedJoint(feedJointKey);
-            if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is unavailable.
+            if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is
+                                           // unavailable.
                 connectionLocation = ConnectionLocation.SOURCE_FEED_INTAKE_STAGE;
                 FeedId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId
                 Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getFeedName());
@@ -2262,7 +2268,8 @@
                     functionsToApply.add(f);
                 }
             }
-            // register the compute feed point that represents the final output from the collection of
+            // register the compute feed point that represents the final output from the collection
+            // of
             // functions that will be applied.
             if (!functionsToApply.isEmpty()) {
                 FeedJointKey computeFeedJointKey = new FeedJointKey(feed.getFeedId(), functionsToApply);
@@ -2475,7 +2482,7 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            //#. run the jobs
+            // #. run the jobs
             for (JobSpecification jobSpec : jobsToExecute) {
                 JobUtils.runJob(hcc, jobSpec, true);
             }
@@ -2656,7 +2663,8 @@
                 return;
             }
 
-            // At this point, we know data has changed in the external file system, record transaction in metadata and start
+            // At this point, we know data has changed in the external file system, record
+            // transaction in metadata and start
             transactionDataset = ExternalIndexingOperations.createTransactionDataset(ds);
             /*
              * Remove old dataset record and replace it with a new one
@@ -2682,14 +2690,14 @@
             bActiveTxn = false;
             transactionState = ExternalDatasetTransactionState.BEGIN;
 
-            //run the files update job
+            // run the files update job
             JobUtils.runJob(hcc, spec, true);
 
             for (Index index : indexes) {
                 if (!ExternalIndexingOperations.isFileIndex(index)) {
                     spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, deletedFiles,
                             addedFiles, appendedFiles, metadataProvider);
-                    //run the files update job
+                    // run the files update job
                     JobUtils.runJob(hcc, spec, true);
                 }
             }
@@ -2860,7 +2868,8 @@
             }
             // Finds PREGELIX_HOME in AsterixDB configuration.
             if (pregelixHome == null) {
-                // Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties, pregelixHome can never be null.
+                // Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties,
+                // pregelixHome can never be null.
                 pregelixHome = AsterixAppContextInfo.getInstance().getCompilerProperties().getPregelixHome();
             }
 
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
index 976ca70..8d020e7 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
@@ -27,7 +27,6 @@
 import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.test.aql.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
-import org.apache.asterix.testframework.xml.TestGroup;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 5e76ecb..e62c315 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -59,7 +59,7 @@
             LOGGER.info("initializing HDFS");
         }
 
-        HDFSCluster.getInstance().setup();
+        // HDFSCluster.getInstance().setup();
 
         // Set the node resolver to be the identity resolver that expects node
         // names
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql
new file mode 100644
index 0000000..8cd89da
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Drop a dataverse with disconnected feed 
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use dataverse experiments;
+
+create type TwitterUserType as closed {
+    screen-name: string,
+    lang: string,
+    friends_count: int32,
+    statuses_count: int32,
+    name: string,
+    followers_count: int32
+}
+
+create type TweetMessageType as closed {
+    tweetid: int64,
+    user: TwitterUserType,
+    sender-location: point,
+    send-time: datetime,
+    referred-topics: {{ string }},
+    message-text: string
+}
+
+create dataset Tweets(TweetMessageType) primary key tweetid;
+
+create feed TweetFeed using socket_adapter
+(
+    ("sockets"="127.0.0.1:10001"),
+    ("address-type"="IP"),
+    ("type-name"="TweetMessageType"),
+    ("format"="adm")
+);
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
new file mode 100644
index 0000000..9dcd753
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a feed dataset and verify contents in Metadata
+ * Expected Res : Success
+ * Date         : 24th Dec 2012
+ */
+
+use dataverse experiments;
+set wait-for-completion-feed "false";
+
+connect feed TweetFeed to dataset Tweets;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
new file mode 100644
index 0000000..97b7013
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a feed dataset and verify contents in Metadata
+ * Expected Res : Success
+ * Date         : 24th Dec 2012
+ */
+3000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
new file mode 100644
index 0000000..4106d42
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Drop a dataverse with disconnected feed 
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+
+use dataverse experiments;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
new file mode 100644
index 0000000..8aaec06
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Drop a dataverse with disconnected feed 
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+
+use dataverse experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index a3a1fba..84c6ad3 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -36,6 +36,11 @@
           </compilation-unit>
         </test-case> -->
         <test-case FilePath="feeds">
+            <compilation-unit name="drop-dataverse-with-disconnected-feed">
+                <output-dir compare="Text">drop-dataverse-with-disconnected-feed</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
             <compilation-unit name="feed-with-external-parser">
                 <output-dir compare="Text">feed-with-external-parser</output-dir>
             </compilation-unit>
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java b/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
new file mode 100644
index 0000000..6082269
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+public class FileFeedSocketAdapterClient implements ITestClient {
+    private final int port;
+    private final int wait;
+    private final String url;
+    private Socket socket;
+    private String path;
+    private int batchSize;
+    private int maxCount;
+    private OutputStream out = null;
+
+    // expected args: url, source-file-path, max-count, batch-size, wait
+    public FileFeedSocketAdapterClient(int port, String[] args) throws Exception {
+        this.port = port;
+        if (args.length != 5) {
+            throw new Exception(
+                    "Invalid arguments for FileFeedSocketAdapterClient. Expected arguments <url> <source-file-path> <max-count> <batch-size> <wait>");
+        }
+        this.url = args[0];
+        this.path = args[1];
+        this.maxCount = Integer.parseInt(args[2]);
+        this.batchSize = Integer.parseInt(args[3]);
+        this.wait = Integer.parseInt(args[4]);
+    }
+
+    @Override
+    public void start() {
+        try {
+            socket = new Socket(url, port);
+        } catch (IOException e) {
+            System.err.println("Problem in creating socket against host " + url + " on the port " + port);
+            e.printStackTrace();
+        }
+
+        int recordCount = 0;
+        BufferedReader br = null;
+        try {
+            out = socket.getOutputStream();
+            br = new BufferedReader(new FileReader(path));
+            String nextRecord;
+            byte[] b;
+            byte[] newLineBytes = "\n".getBytes();
+
+            while ((nextRecord = br.readLine()) != null) {
+                b = nextRecord.replaceAll("\\s+", " ").getBytes();
+                if (wait >= 1 && recordCount % batchSize == 0) {
+                    Thread.sleep(wait);
+                }
+                out.write(b);
+                out.write(newLineBytes);
+                recordCount++;
+                if (recordCount % 100000 == 0) {
+                    System.err.println("send " + recordCount);
+                }
+                if (recordCount == maxCount) {
+                    break;
+                }
+            }
+            System.err.println("send " + recordCount);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (br != null) {
+                try {
+                    br.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (out != null) {
+            try {
+                out.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        try {
+            if (socket != null) {
+                socket.close();
+            }
+        } catch (IOException e) {
+            System.err.println("Problem in closing socket against host " + url + " on the port " + port);
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java b/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
new file mode 100644
index 0000000..56d626d
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+public interface ITestClient {
+
+    public void start() throws Exception;
+
+    public void stop() throws Exception;
+
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java b/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
new file mode 100644
index 0000000..d26351b
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+import java.util.Arrays;
+
+public class TestClientProvider {
+
+    public static ITestClient createTestClient(String[] args, int port) throws Exception {
+        if (args.length < 1) {
+            throw new Exception("Unspecified test client");
+        }
+        String clientName = args[0];
+        String[] clientArgs = Arrays.copyOfRange(args, 1, args.length);
+        switch (clientName) {
+            case "file-client":
+                return new FileFeedSocketAdapterClient(port, clientArgs);
+            default:
+                throw new Exception("Unknown test client: " + clientName);
+        }
+    }
+
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
index f40cce4..ba32af2 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
@@ -26,10 +26,10 @@
 import java.net.Socket;
 
 public class FileTestServer implements ITestServer {
-    private String[] paths;
-    private final int port;
-    private ServerSocket serverSocket;
-    private Thread listenerThread;
+    protected String[] paths;
+    protected final int port;
+    protected ServerSocket serverSocket;
+    protected Thread listenerThread;
 
     public FileTestServer(int port) {
         this.port = port;
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
index 18a4969..b3b1183 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
@@ -20,7 +20,7 @@
 
 public interface ITestServer {
 
-    public void configure(String[] args);
+    public void configure(String[] args) throws Exception;
 
     public void start() throws Exception;
 
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
new file mode 100644
index 0000000..d417cf6
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class OpenSocketFileTestServer extends FileTestServer {
+
+    private boolean closed;
+
+    public OpenSocketFileTestServer(int port) {
+        super(port);
+    }
+
+    @Override
+    public void start() throws IOException {
+        serverSocket = new ServerSocket(port);
+        listenerThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (!serverSocket.isClosed()) {
+                    try {
+                        Socket socket = serverSocket.accept();
+                        new Thread(new SocketThread(socket)).start();
+                    } catch (IOException e) {
+                        // Do nothing. This means the socket was closed for some reason.
+                        // There is nothing to do here except try to close the socket and see if the
+                        // server is still listening!
+                        // This also could be due to the close() call
+                    }
+                }
+            }
+        });
+        listenerThread.start();
+    }
+
+    private class SocketThread implements Runnable {
+        private Socket socket;
+        private OutputStream os;
+
+        public SocketThread(Socket socket) {
+            this.socket = socket;
+        }
+
+        @Override
+        public void run() {
+            try {
+                os = socket.getOutputStream();
+                byte[] chunk = new byte[1024];
+                for (String path : paths) {
+                    try (FileInputStream fin = new FileInputStream(new File(path))) {
+                        int read = fin.read(chunk);
+                        while (read > 0) {
+                            os.write(chunk, 0, read);
+                            read = fin.read(chunk);
+                        }
+                    }
+                }
+            } catch (Throwable th) {
+            } finally {
+                synchronized (serverSocket) {
+                    if (!closed) {
+                        try {
+                            serverSocket.wait();
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+                try {
+                    os.close();
+                } catch (Throwable th) {
+                    th.printStackTrace();
+                }
+                try {
+                    socket.close();
+                } catch (Throwable th) {
+                    th.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void stop() throws IOException, InterruptedException {
+        synchronized (serverSocket) {
+            closed = true;
+            try {
+                serverSocket.close();
+                if (listenerThread.isAlive()) {
+                    listenerThread.join();
+                }
+            } finally {
+                serverSocket.notifyAll();
+            }
+        }
+    }
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
new file mode 100644
index 0000000..3312d1b
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.server;
+
+import org.apache.asterix.test.client.ITestClient;
+import org.apache.asterix.test.client.TestClientProvider;
+
+public class TestClientServer implements ITestServer {
+
+    // port of the server to connect to
+    private final int port;
+    private ITestClient client;
+
+    public TestClientServer(int port) {
+        this.port = port;
+    }
+
+    @Override
+    public void configure(String[] args) throws Exception {
+        client = TestClientProvider.createTestClient(args, port);
+    }
+
+    @Override
+    public void start() throws Exception {
+        client.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        client.stop();
+    }
+
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java b/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
index 0be6800..ab8b005 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
@@ -26,6 +26,10 @@
                 return new FileTestServer(port);
             case "rss":
                 return new RSSTestServer(port);
+            case "open-socket-file":
+                return new OpenSocketFileTestServer(port);
+            case "client":
+                return new TestClientServer(port);
             default:
                 throw new Exception("Unknown test server");
         }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
index 1e86f39..f61ecbd 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
@@ -25,17 +25,25 @@
 import java.util.Map;
 
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class SocketInputStream extends AInputStream {
     private ServerSocket server;
     private Socket socket;
     private InputStream connectionStream;
+    private boolean closed;
 
     public SocketInputStream(ServerSocket server) throws IOException {
         this.server = server;
-        socket = server.accept();
-        connectionStream = socket.getInputStream();
+        socket = new Socket();
+        connectionStream = new InputStream() {
+            @Override
+            public int read() throws IOException {
+                return -1;
+            }
+        };
     }
 
     @Override
@@ -66,9 +74,15 @@
 
     @Override
     public int read(byte b[], int off, int len) throws IOException {
+        if (closed) {
+            return -1;
+        }
         int read = connectionStream.read(b, off, len);
         while (read < 0) {
-            accept();
+            if (!accept()) {
+                closed = true;
+                return -1;
+            }
             read = connectionStream.read(b, off, len);
         }
         return read;
@@ -86,21 +100,55 @@
 
     @Override
     public void close() throws IOException {
-        connectionStream.close();
-        socket.close();
-        server.close();
+        HyracksDataException hde = null;
+        try {
+            if (connectionStream != null) {
+                connectionStream.close();
+            }
+            connectionStream = null;
+        } catch (IOException e) {
+            hde = new HyracksDataException(e);
+        }
+        try {
+            if (socket != null) {
+                socket.close();
+            }
+            socket = null;
+        } catch (IOException e) {
+            hde = ExternalDataExceptionUtils.suppress(hde, e);
+        }
+        try {
+            if (server != null) {
+                server.close();
+            }
+            server = null;
+        } catch (IOException e) {
+            hde = ExternalDataExceptionUtils.suppress(hde, e);
+        }
+        if (hde != null) {
+            throw hde;
+        }
     }
 
-    private void accept() throws IOException {
-        connectionStream.close();
-        socket.close();
-        socket = server.accept();
-        connectionStream = socket.getInputStream();
+    private boolean accept() throws IOException {
+        try {
+            connectionStream.close();
+            connectionStream = null;
+            socket.close();
+            socket = null;
+            socket = server.accept();
+            connectionStream = socket.getInputStream();
+            return true;
+        } catch (Exception e) {
+            close();
+            return false;
+        }
     }
 
     @Override
     public boolean stop() throws Exception {
-        return false;
+        close();
+        return true;
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
index e39b507..cd4a3c1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
@@ -58,7 +58,6 @@
 
     @Override
     public AInputStream getInputStream() throws Exception {
-        twitterServer.start();
         return twitterServer;
     }
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index 7e28c35..d0348c2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -149,7 +149,7 @@
     private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
         int waitCycleCount = 0;
         ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
-        while (ingestionRuntime == null && waitCycleCount < 10) {
+        while (ingestionRuntime == null && waitCycleCount < 1000) {
             try {
                 Thread.sleep(3000);
                 waitCycleCount++;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
index 3cb5d64..36c11e9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
@@ -241,7 +241,8 @@
         FeedRuntimeId runtimeId = null;
         FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType();
         if (endFeedMessage.isCompleteDisconnection()) {
-            // subscribableRuntimeType represents the location at which the feed connection receives data
+            // subscribableRuntimeType represents the location at which the feed connection receives
+            // data
             FeedRuntimeType runtimeType = null;
             switch (subscribableRuntimeType) {
                 case INTAKE:
@@ -257,15 +258,19 @@
             runtimeId = new FeedRuntimeId(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
             CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager()
                     .getFeedRuntime(connectionId, runtimeId);
-            feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+            if (feedRuntime != null) {
+                feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+            }
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
             }
         } else {
-            // subscribaleRuntimeType represents the location for data hand-off in presence of subscribers
+            // subscribaleRuntimeType represents the location for data hand-off in presence of
+            // subscribers
             switch (subscribableRuntimeType) {
                 case INTAKE:
-                    // illegal state as data hand-off from one feed to another does not happen at intake
+                    // illegal state as data hand-off from one feed to another does not happen at
+                    // intake
                     throw new IllegalStateException("Illegal State, invalid runtime type  " + subscribableRuntimeType);
                 case COMPUTE:
                     // feed could be primary or secondary, doesn't matter

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/660
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I8f6e982440d3577343f2479c3779653a9c3db614
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message