asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: Feed Fixes and Cleanup
Date Thu, 07 Jan 2016 13:56:13 GMT
abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/574

Change subject: Feed Fixes and Cleanup
......................................................................

Feed Fixes and Cleanup

1. Introduce filesystem feed data source.
2. Fix the order of closing feed stages on disconnection.
3. Added Twitter feed to the compatibility utility to allow
   using it with its alias.
4. First part of the feed log space.
5. Fixed the handling of duplicate key exception but now we have
   a deadlock.

Change-Id: I4e8db26a810efd1fbaa52ceeb3efd0c8328ab070
---
M asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
M asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java
M asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
M asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
M asterix-app/src/test/resources/runtimets/testsuite.xml
M asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
M asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java
M asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedExceptionHandler.java
M asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedPolicyAccessor.java
M asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
M asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java
M asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IDataSourceAdapter.java
M asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
M asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
R asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
M asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
A asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java
M asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
A asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
A asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
A asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
A asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
M asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
M asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
M asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
A asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FileSystemFeedProgressTracker.java
R asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IngestionRuntime.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
A asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
M asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
M asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
M asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
M asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
M asterix-external-data/src/main/java/org/apache/asterix/external/runtime/GenericSocketFeedAdapter.java
M asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
M asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
M asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
M asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
M asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
M asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
A asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
C asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
A asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
A asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
R asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolverFactory.java
D asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
M asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
M asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterExecutor.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterRuntimeManager.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
R asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/MetadataUtil.java
71 files changed, 1,664 insertions(+), 392 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/74/574/1

diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 7c055e2..14d8705 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -139,7 +139,7 @@
 import org.apache.asterix.metadata.entities.PrimaryFeed;
 import org.apache.asterix.metadata.entities.SecondaryFeed;
 import org.apache.asterix.metadata.feeds.FeedLifecycleEventSubscriber;
-import org.apache.asterix.metadata.feeds.FeedUtil;
+import org.apache.asterix.metadata.feeds.MetadataUtil;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
 import org.apache.asterix.metadata.utils.MetadataLockManager;
@@ -2103,10 +2103,10 @@
             CompiledConnectFeedStatement cbfs = new CompiledConnectFeedStatement(dataverseName, cfs.getFeedName(),
                     cfs.getDatasetName().getValue(), cfs.getPolicy(), cfs.getQuery(), cfs.getVarCounter());
 
-            FeedUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(),
+            MetadataUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(),
                     metadataProvider.getMetadataTxnContext());
 
-            Feed feed = FeedUtil.validateIfFeedExists(dataverseName, cfs.getFeedName(),
+            Feed feed = MetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName(),
                     metadataProvider.getMetadataTxnContext());
 
             feedConnId = new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName().getValue());
@@ -2116,7 +2116,7 @@
                         + cfs.getDatasetName().getValue());
             }
 
-            FeedPolicy feedPolicy = FeedUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(), mdTxnCtx);
+            FeedPolicy feedPolicy = MetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(), mdTxnCtx);
 
             // All Metadata checks have passed. Feed connect request is valid. //
 
@@ -2286,8 +2286,8 @@
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-        FeedUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(), mdTxnCtx);
-        Feed feed = FeedUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
+        MetadataUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(), mdTxnCtx);
+        Feed feed = MetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
 
         FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), cfs.getDatasetName().getValue());
         boolean isFeedConnectionActive = FeedLifecycleListener.INSTANCE.isFeedConnectionActive(connectionId);
@@ -2360,7 +2360,7 @@
 
         try {
 
-            JobSpecification alteredJobSpec = FeedUtil.alterJobSpecificationForFeed(compiled, feedConnectionId,
+            JobSpecification alteredJobSpec = MetadataUtil.alterJobSpecificationForFeed(compiled, feedConnectionId,
                     bfs.getSubscriptionRequest().getPolicyParameters());
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java
index cb3133e..537f4e7 100644
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java
@@ -44,7 +44,7 @@
 import org.apache.asterix.common.feeds.message.ScaleInReportMessage;
 import org.apache.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
 import org.apache.asterix.file.FeedOperations;
-import org.apache.asterix.metadata.feeds.FeedUtil;
+import org.apache.asterix.metadata.feeds.MetadataUtil;
 import org.apache.asterix.metadata.feeds.PrepareStallMessage;
 import org.apache.asterix.metadata.feeds.TerminateDataFlowMessage;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
@@ -117,7 +117,7 @@
                 List<String> newLocations = new ArrayList<String>();
                 newLocations.addAll(currentComputeLocations);
                 newLocations.addAll(helperComputeNodes);
-                FeedUtil.increaseCardinality(jobSpec, FeedRuntimeType.COMPUTE, requiredCardinality, newLocations);
+                MetadataUtil.increaseCardinality(jobSpec, FeedRuntimeType.COMPUTE, requiredCardinality, newLocations);
 
                 // Step 2) send prepare to  stall message
                 gracefullyTerminateDataFlow(message.getConnectionId(), Integer.MAX_VALUE);
@@ -161,7 +161,7 @@
             List<String> currentComputeLocations = new ArrayList<String>();
             currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message.getConnectionId()
                     .getFeedId()));
-            FeedUtil.decreaseComputeCardinality(jobSpec, FeedRuntimeType.COMPUTE, reducedCardinality,
+            MetadataUtil.decreaseComputeCardinality(jobSpec, FeedRuntimeType.COMPUTE, reducedCardinality,
                     currentComputeLocations);
 
             gracefullyTerminateDataFlow(message.getConnectionId(), reducedCardinality - 1);
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
index 01775ab..5504af1 100755
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
@@ -35,6 +35,8 @@
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter.AdapterType;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.library.ExternalLibrary;
 import org.apache.asterix.external.library.ExternalLibraryManager;
@@ -44,7 +46,6 @@
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.api.IMetadataEntity;
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Library;
 import org.apache.asterix.metadata.feeds.AdapterIdentifier;
@@ -210,7 +211,7 @@
                     String adapterFactoryClass = adapter.getFactoryClass().trim();
                     String adapterName = libraryName + "#" + adapter.getName().trim();
                     AdapterIdentifier aid = new AdapterIdentifier(dataverse, adapterName);
-                    DatasourceAdapter dsa = new DatasourceAdapter(aid, adapterFactoryClass, AdapterType.EXTERNAL);
+                    DatasourceAdapter dsa = new DatasourceAdapter(aid, adapterFactoryClass, IDataSourceAdapter.AdapterType.EXTERNAL);
                     MetadataManager.INSTANCE.addAdapter(mdTxnCtx, dsa);
                     if (LOGGER.isLoggable(Level.INFO)) {
                         LOGGER.info("Installed adapter: " + adapterName);
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
index f981aca..f3e4605 100644
--- a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "Function": null, "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }} }, "SecondaryTypeDetails": null, "Timestamp": "Sat Jun 20 13:55:58 PDT 2015" }
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "Function": null, "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }} }, "SecondaryTypeDetails": null, "Timestamp": "Sat Jun 20 13:55:58 PDT 2015" }
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 430fd8e..4d7b661 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -26,6 +26,82 @@
         ResultOffsetPath="results"
         QueryOffsetPath="queries"
         QueryFileExtension=".aql">
+    <test-group name="feeds">
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_01">
+                <output-dir compare="Text">feeds_01</output-dir>
+            </compilation-unit>
+        </test-case>
+        <!--Disable it because of sporadic failures. Abdullah will re-enable it.
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_02">
+                <output-dir compare="Text">feeds_02</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_03">
+                <output-dir compare="Text">feeds_03</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_04">
+                <output-dir compare="Text">feeds_04</output-dir>
+            </compilation-unit>
+        </test-case>
+
+        <test-case FilePath="feeds">
+          <compilation-unit name="feeds_06">
+            <output-dir compare="Text">feeds_06</output-dir>
+          </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_07">
+                <output-dir compare="Text">feeds_07</output-dir>
+            </compilation-unit>
+        </test-case>
+
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_08">
+                <output-dir compare="Text">feeds_08</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_09">
+                <output-dir compare="Text">feeds_09</output-dir>
+            </compilation-unit>
+        </test-case>
+
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_10">
+                <output-dir compare="Text">feeds_10</output-dir>
+            </compilation-unit>
+        </test-case>
+
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_11">
+                <output-dir compare="Text">feeds_11</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_12">
+                <output-dir compare="Text">feeds_12</output-dir>
+            </compilation-unit>
+        </test-case>
+
+        <test-case FilePath="feeds">
+            <compilation-unit name="issue_230_feeds">
+                <output-dir compare="Text">issue_230_feeds</output-dir>
+            </compilation-unit>
+        </test-case>
+
+        <test-case FilePath="feeds">
+            <compilation-unit name="issue_711_feeds">
+                <output-dir compare="Text">issue_711_feeds</output-dir>
+            </compilation-unit>
+        </test-case>
+        -->
+
+    </test-group>
     <test-group name="flwor">
         <test-case FilePath="flwor">
             <compilation-unit name="at00">
@@ -6114,83 +6190,6 @@
                 <output-dir compare="Text">issue_251_dataset_hint_7</output-dir>
             </compilation-unit>
         </test-case>
-    </test-group>
-    <test-group name="feeds">
-
-        <!--Disable it because of sporadic failures. Raman will re-enable it.
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_01">
-                <output-dir compare="Text">feeds_01</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_02">
-                <output-dir compare="Text">feeds_02</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_03">
-                <output-dir compare="Text">feeds_03</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_04">
-                <output-dir compare="Text">feeds_04</output-dir>
-            </compilation-unit>
-        </test-case>
-
-        <test-case FilePath="feeds">
-          <compilation-unit name="feeds_06">
-            <output-dir compare="Text">feeds_06</output-dir>
-          </compilation-unit>
-        </test-case>
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_07">
-                <output-dir compare="Text">feeds_07</output-dir>
-            </compilation-unit>
-        </test-case>
-
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_08">
-                <output-dir compare="Text">feeds_08</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_09">
-                <output-dir compare="Text">feeds_09</output-dir>
-            </compilation-unit>
-        </test-case>
-
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_10">
-                <output-dir compare="Text">feeds_10</output-dir>
-            </compilation-unit>
-        </test-case>
-
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_11">
-                <output-dir compare="Text">feeds_11</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_12">
-                <output-dir compare="Text">feeds_12</output-dir>
-            </compilation-unit>
-        </test-case>
-
-        <test-case FilePath="feeds">
-            <compilation-unit name="issue_230_feeds">
-                <output-dir compare="Text">issue_230_feeds</output-dir>
-            </compilation-unit>
-        </test-case>
-
-        <test-case FilePath="feeds">
-            <compilation-unit name="issue_711_feeds">
-                <output-dir compare="Text">issue_711_feeds</output-dir>
-            </compilation-unit>
-        </test-case>
-        -->
-
     </test-group>
     <test-group name="hdfs">
       <test-case FilePath="hdfs">
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index fd1ebb8..d25e51f 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.FrameDataException;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -41,6 +42,7 @@
 
     private final boolean isPrimary;
     private AbstractLSMIndex lsmIndex;
+    private int i = 0;
 
     public boolean isPrimary() {
         return isPrimary;
@@ -85,7 +87,7 @@
         ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
         int tupleCount = accessor.getTupleCount();
         try {
-            for (int i = 0; i < tupleCount; i++) {
+            for (; i < tupleCount; i++) {
                 if (tupleFilter != null) {
                     frameTuple.reset(accessor, i);
                     if (!tupleFilter.accept(frameTuple)) {
@@ -117,11 +119,13 @@
                 }
             }
         } catch (Throwable th) {
-            throw new HyracksDataException(th);
+            FrameDataException fde = new FrameDataException(i, th);
+            throw fde;
         }
         writeBuffer.ensureFrameSize(buffer.capacity());
         FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
         FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
+        i = 0;
     }
 
     @Override
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java b/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java
index 136a196..18b5264 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java
@@ -26,7 +26,7 @@
 
     private final int tupleIndex;
 
-    public FrameDataException(int tupleIndex, Exception cause) {
+    public FrameDataException(int tupleIndex, Throwable cause) {
         super(cause);
         this.tupleIndex = tupleIndex;
     }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedExceptionHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedExceptionHandler.java
index f1728ce..8eb12d6 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedExceptionHandler.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedExceptionHandler.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.common.feeds;
 
-import java.io.DataInputStream;
 import java.nio.ByteBuffer;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -31,7 +30,6 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 
 public class FeedExceptionHandler implements IExceptionHandler {
 
@@ -52,6 +50,7 @@
         this.connectionId = connectionId;
     }
 
+    @Override
     public ByteBuffer handleException(Exception e, ByteBuffer frame) {
         try {
             if (e instanceof FrameDataException) {
@@ -83,15 +82,15 @@
     }
 
     private void logExceptionCausingTuple(int tupleIndex, Exception e) throws HyracksDataException, AsterixException {
-
+        /*
         ByteBufferInputStream bbis = new ByteBufferInputStream();
         DataInputStream di = new DataInputStream(bbis);
-
+        
         int start = fta.getTupleStartOffset(tupleIndex) + fta.getFieldSlotsLength();
         bbis.setByteBuffer(fta.getBuffer(), start);
-
+        
         Object[] record = new Object[recordDesc.getFieldCount()];
-
+        
         for (int i = 0; i < record.length; ++i) {
             Object instance = recordDesc.getFields()[i].deserialize(di);
             if (i == 0) {
@@ -103,6 +102,6 @@
                 }
             }
         }
-
+        */
     }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedPolicyAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedPolicyAccessor.java
index cd7d598..aed2d6a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedPolicyAccessor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedPolicyAccessor.java
@@ -28,7 +28,12 @@
 
     private static final long serialVersionUID = 1L;
 
-    /** failure configuration **/
+    /**
+     * --------------------------
+     * failure configuration
+     * --------------------------
+     **/
+
     /** continue feed ingestion after a soft (runtime) failure **/
     public static final String SOFT_FAILURE_CONTINUE = "soft.failure.continue";
 
@@ -44,7 +49,15 @@
     /** framework provides guarantee that each received feed record will be processed through the ingestion pipeline at least once **/
     public static final String AT_LEAST_ONE_SEMANTICS = "atleast.once.semantics";
 
-    /** flow control configuration **/
+    /**
+     * --------------------------
+     * flow control configuration
+     * --------------------------
+     **/
+
+    /** enable buffering in feeds **/
+    public static final String BUFFERING_ENABLED = "buffering.enabled";
+
     /** spill excess tuples to disk if an operator cannot process incoming data at its arrival rate **/
     public static final String SPILL_TO_DISK_ON_CONGESTION = "spill.to.disk.on.congestion";
 
@@ -111,6 +124,10 @@
     }
 
     /** flow control **/
+    public boolean bufferingEnabled() {
+        return getBooleanPropertyValue(BUFFERING_ENABLED, false);
+    }
+
     public boolean spillToDiskOnCongestion() {
         return getBooleanPropertyValue(SPILL_TO_DISK_ON_CONGESTION, false);
     }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
index 6642df1..4dae741 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
@@ -41,7 +41,10 @@
 
 /**
  * Provides for error-handling and input-side buffering for a feed runtime.
- */
+ * The input handler is buffering in:
+ * 1. FeedMetaComputeNodePushable.initializeNewFeedRuntime();
+ * 2. FeedMetaStoreNodePushable.initializeNewFeedRuntime();
+ **/
 public class FeedRuntimeInputHandler implements IFrameWriter {
 
     private static Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
@@ -49,13 +52,12 @@
     private final FeedConnectionId connectionId;
     private final FeedRuntimeId runtimeId;
     private final FeedPolicyAccessor feedPolicyAccessor;
-    private boolean bufferingEnabled;
     private final IExceptionHandler exceptionHandler;
     private final FeedFrameDiscarder discarder;
     private final FeedFrameSpiller spiller;
     private final FeedPolicyAccessor fpa;
     private final IFeedManager feedManager;
-
+    private boolean bufferingEnabled;
     private IFrameWriter coreOperator;
     private MonitoredBuffer mBuffer;
     private DataBucketPool pool;
@@ -68,6 +70,35 @@
 
     private FrameEventCallback frameEventCallback;
 
+    public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            IFrameWriter coreOperator, FeedPolicyAccessor fpa, FrameTupleAccessor fta, RecordDescriptor recordDesc,
+            IFeedManager feedManager, int nPartitions) throws IOException {
+        this.connectionId = connectionId;
+        this.runtimeId = runtimeId;
+        this.coreOperator = coreOperator;
+        this.bufferingEnabled = fpa.bufferingEnabled();;
+        this.feedPolicyAccessor = fpa;
+        this.spiller = new FeedFrameSpiller(ctx, connectionId, runtimeId, fpa);
+        this.discarder = new FeedFrameDiscarder(connectionId, runtimeId, fpa, this);
+        this.exceptionHandler = new FeedExceptionHandler(ctx, fta, recordDesc, feedManager, connectionId);
+        this.mode = Mode.PROCESS;
+        this.lastMode = Mode.PROCESS;
+        this.finished = false;
+        this.fpa = fpa;
+        this.feedManager = feedManager;
+        this.pool = (DataBucketPool) feedManager.getFeedMemoryManager()
+                .getMemoryComponent(IFeedMemoryComponent.Type.POOL);
+        this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager()
+                .getMemoryComponent(IFeedMemoryComponent.Type.COLLECTION);
+        this.frameEventCallback = new FrameEventCallback(fpa, this, coreOperator);
+        this.mBuffer = MonitoredBuffer.getMonitoredBuffer(ctx, this, coreOperator, fta, recordDesc,
+                feedManager.getFeedMetricCollector(), connectionId, runtimeId, exceptionHandler, frameEventCallback,
+                nPartitions, fpa);
+        this.mBuffer.start();
+        this.throttlingEnabled = false;
+    }
+
+    // befferingEnabled passed as a parameter
     public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
             IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled, FrameTupleAccessor fta,
             RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions) throws IOException {
@@ -245,8 +276,15 @@
         while (!frameProcessed) {
             try {
                 if (!bufferingEnabled) {
-                    coreOperator.nextFrame(frame); // synchronous
-                    mBuffer.sendReport(frame);
+                    if (frame == null) {
+                        setFinished(true);
+                        synchronized (coreOperator) {
+                            coreOperator.notifyAll();
+                        }
+                    } else {
+                        coreOperator.nextFrame(frame); // synchronous
+                        mBuffer.sendReport(frame);
+                    }
                 } else {
                     DataBucket bucket = pool.getDataBucket();
                     if (bucket != null) {
@@ -285,6 +323,7 @@
                 }
                 frameProcessed = true;
             } catch (Exception e) {
+                e.printStackTrace();
                 if (feedPolicyAccessor.continueOnSoftFailure()) {
                     frame = exceptionHandler.handleException(e, frame);
                     if (frame == null) {
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java
index e5a22b5..2660895 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java
@@ -212,7 +212,7 @@
     }
 
     protected void preProcessFrame(ByteBuffer frame) throws Exception {
-        if (postProcessor == null) {
+        if (preProcessor == null) {
             preProcessor = getFramePreProcessor();
         }
         if (preProcessor != null) {
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IDataSourceAdapter.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IDataSourceAdapter.java
index 9dd4e76..a2e2325 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IDataSourceAdapter.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IDataSourceAdapter.java
@@ -30,9 +30,13 @@
  */
 public interface IDataSourceAdapter extends Serializable {
 
+    public enum AdapterType {
+        INTERNAL,
+        EXTERNAL
+    }
+
     /**
      * Triggers the adapter to begin ingesting data from the external source.
-     * 
      * @param partition
      *            The adapter could be running with a degree of parallelism.
      *            partition corresponds to the i'th parallel instance.
@@ -44,17 +48,4 @@
      * @throws Exception
      */
     public void start(int partition, IFrameWriter writer) throws Exception;
-
-    /**
-     * Discontinue the ingestion of data.
-     *
-     * @throws Exception
-     */
-    public boolean stop() throws Exception;
-
-    /**
-     * @param e
-     * @return true if the ingestion should continue post the exception else false
-     */
-    public boolean handleException(Throwable e);
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
index 4067508..4a8b631 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
@@ -19,6 +19,4 @@
 package org.apache.asterix.common.feeds.api;
 
 public interface ITupleTrackingFeedAdapter extends IDataSourceAdapter {
-
-    public void tuplePersistedTimeCallback(long timestamp);
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
index 5ee065a..2623ae5 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
@@ -32,14 +32,15 @@
     public enum TupleForwardPolicy {
         FRAME_FULL,
         COUNTER_TIMER_EXPIRED,
-        RATE_CONTROLLED
+        RATE_CONTROLLED,
+        FEED
     }
 
     public void configure(Map<String, String> configuration);
 
     public void initialize(IHyracksCommonContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
 
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException, InterruptedException;
 
     public void close() throws HyracksDataException;
 
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IAdapterRuntimeManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
similarity index 93%
rename from asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IAdapterRuntimeManager.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
index 2eb6caa..3c6fca6 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IAdapterRuntimeManager.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.feeds.api;
+package org.apache.asterix.external.api;
 
 import org.apache.asterix.common.feeds.FeedId;
+import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
 
 public interface IAdapterRuntimeManager {
 
@@ -63,7 +64,7 @@
     /**
      * @return the instance of the feed adapter (an implementation of {@code IFeedAdapter}) in use.
      */
-    public IDataSourceAdapter getFeedAdapter();
+    public IFeedAdapter getFeedAdapter();
 
     /**
      * @return state associated with the AdapterRuntimeManager. See {@code State}.
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index f5f47ec..e5453cf 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -43,7 +43,11 @@
 
     public void start(IFrameWriter writer) throws HyracksDataException;
 
-    public boolean stop();
+    public boolean stop() throws Exception;
+
+    public boolean pause() throws HyracksDataException;
+
+    public boolean resume() throws Exception;
 
     public boolean handleException(Throwable th);
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java
new file mode 100644
index 0000000..6f93d3a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFeedAdapter extends IDataSourceAdapter {
+    /**
+     * Pause the ingestion of data.
+     * @throws HyracksDataException
+     * @throws Exception
+     */
+    public boolean pause() throws HyracksDataException;
+
+    /**
+     * Resume the ingestion of data.
+     * @throws HyracksDataException
+     * @throws Exception
+     */
+    public boolean resume() throws HyracksDataException;
+
+    /**
+     * Discontinue the ingestion of data.
+     * @throws Exception
+     */
+    public boolean stop() throws Exception;
+
+    /**
+     * @param e
+     * @return true if the ingestion should continue post the exception else false
+     * @throws Exception
+     */
+    public boolean handleException(Throwable e);
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
index 31d6317..531d050 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.api;
 
 import java.io.DataOutput;
+import java.io.IOException;
 import java.io.InputStream;
 
 public interface IStreamDataParser extends IDataParser {
@@ -30,10 +31,17 @@
     /**
      * Parse data into output AsterixDataModel binary records.
      * Used with parsers that support stream sources
-     *
      * @param out
      *            DataOutput instance that for writing the parser output.
      */
-
     public boolean parse(DataOutput out) throws Exception;
+
+    /**
+     * reset the parser state. this is called when a failure takes place
+     * and the job needs to continue and to do that, the parser need to
+     * be in a consistent state
+     * @return true if reset was successful, false, otherwise
+     * @throws IOException
+     */
+    public boolean reset(InputStream in) throws IOException;
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
new file mode 100644
index 0000000..d6c0113
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public abstract class AbstractFeedDataFlowController implements IDataFlowController {
+    protected FeedTupleForwarder tupleForwarder;
+    protected IHyracksTaskContext ctx;
+    protected Map<String, String> configuration;
+    protected static final int NUMBER_OF_TUPLE_FIELDS = 1;
+    protected ArrayTupleBuilder tb = new ArrayTupleBuilder(NUMBER_OF_TUPLE_FIELDS);
+
+    @Override
+    public ITupleForwarder getTupleForwarder() {
+        return tupleForwarder;
+    }
+
+    @Override
+    public void setTupleForwarder(ITupleForwarder tupleForwarder) {
+        this.tupleForwarder = (FeedTupleForwarder) tupleForwarder;
+    }
+
+    protected void initializeTupleForwarder(IFrameWriter writer) throws HyracksDataException {
+        tupleForwarder.initialize(ctx, writer);
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) throws IOException {
+        this.configuration = configuration;
+        this.ctx = ctx;
+    }
+
+    public boolean pause() {
+        tupleForwarder.pause();
+        return true;
+    }
+
+    public boolean resume() {
+        tupleForwarder.resume();
+        return true;
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
new file mode 100644
index 0000000..54449ae
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordFlowController;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController
+        implements IRecordFlowController<T> {
+    protected IRecordDataParser<T> dataParser;
+    protected IRecordReader<? extends T> recordReader;
+    protected long interval;
+    protected MutableBoolean closed = new MutableBoolean(false);
+
+    @Override
+    public void start(IFrameWriter writer) throws HyracksDataException {
+        try {
+            initializeTupleForwarder(writer);
+            while (recordReader.hasNext()) {
+                IRawRecord<? extends T> record = recordReader.next();
+                if (record == null) {
+                    Thread.sleep(interval);
+                    continue;
+                }
+                tb.reset();
+                dataParser.parse(record, tb.getDataOutput());
+                tb.addFieldEndOffset();
+                tupleForwarder.addTuple(tb);
+            }
+            tupleForwarder.close();
+            recordReader.close();
+            closeSignal();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void closeSignal() {
+        synchronized (closed) {
+            closed.setValue(true);
+            closed.notifyAll();
+        }
+    }
+
+    private void waitForSignal() throws InterruptedException {
+        synchronized (closed) {
+            while (!closed.getValue()) {
+                closed.wait();
+            }
+        }
+    }
+
+    @Override
+    public boolean stop() throws InterruptedException {
+        if (recordReader.stop()) {
+            waitForSignal();
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return true;
+    }
+
+    @Override
+    public void setRecordParser(IRecordDataParser<T> dataParser) {
+        this.dataParser = dataParser;
+    }
+
+    @Override
+    public void setRecordReader(IRecordReader<T> recordReader) throws Exception {
+        this.recordReader = recordReader;
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
new file mode 100644
index 0000000..2f32f80
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.api.IStreamFlowController;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedStreamDataFlowController extends AbstractFeedDataFlowController implements IStreamFlowController {
+
+    private IStreamDataParser dataParser;
+    private AInputStream stream;
+
+    @Override
+    public void start(IFrameWriter writer) throws HyracksDataException {
+        try {
+            initializeTupleForwarder(writer);
+            while (true) {
+                tb.reset();
+                if (!dataParser.parse(tb.getDataOutput())) {
+                    break;
+                }
+                tb.addFieldEndOffset();
+                tupleForwarder.addTuple(tb);
+            }
+            tupleForwarder.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public boolean stop() throws Exception {
+        if (stream.stop()) {
+            stream.close();
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        boolean handled = true;
+        try {
+            handled &= stream.skipError();
+            if (handled) {
+                handled &= dataParser.reset(stream);
+            }
+        } catch (Exception e) {
+            th.addSuppressed(e);
+            return false;
+        }
+        return handled;
+    }
+
+    @Override
+    public void setStreamParser(IStreamDataParser dataParser) {
+        this.dataParser = dataParser;
+    }
+
+    public void setStream(AInputStream stream) {
+        this.stream = stream;
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
new file mode 100644
index 0000000..9e449a6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.util.Map;
+
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class FeedTupleForwarder implements ITupleForwarder {
+
+    private FrameTupleAppender appender;
+    private IFrame frame;
+    private IFrameWriter writer;
+    private boolean paused = false;
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+    }
+
+    @Override
+    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+        this.appender = new FrameTupleAppender();
+        this.frame = new VSizeFrame(ctx);
+        this.writer = writer;
+        appender.reset(frame, true);
+    }
+
+    @Override
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException, InterruptedException {
+        if (paused) {
+            synchronized (this) {
+                while (paused) {
+                    wait();
+                }
+            }
+        }
+        boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        if (!success) {
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+            appender.reset(frame, true);
+            success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+            if (!success) {
+                throw new IllegalStateException();
+            }
+        }
+
+    }
+
+    public void pause() {
+        paused = true;
+    }
+
+    public synchronized void resume() {
+        paused = false;
+        notifyAll();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+        }
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
index ad8e791..4e43eab 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -57,7 +57,7 @@
 
     @Override
     public boolean stop() {
-        return false;
+        return recordReader.stop();
     }
 
     @Override
@@ -74,4 +74,14 @@
     public void setRecordReader(IRecordReader<T> recordReader) throws Exception {
         this.recordReader = recordReader;
     }
+
+    @Override
+    public boolean pause() throws HyracksDataException {
+        return false;
+    }
+
+    @Override
+    public boolean resume() throws Exception {
+        return false;
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
index 3016470..5acaa36 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
@@ -61,4 +61,14 @@
     public void setStreamParser(IStreamDataParser dataParser) {
         this.dataParser = dataParser;
     }
+
+    @Override
+    public boolean pause() throws HyracksDataException {
+        return false;
+    }
+
+    @Override
+    public boolean resume() throws Exception {
+        return false;
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 74e98dd..5ec8087 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -18,11 +18,12 @@
  */
 package org.apache.asterix.external.dataset.adapter;
 
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
 import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class GenericAdapter implements IDataSourceAdapter {
+public class GenericAdapter implements IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
     private final IDataFlowController controller;
@@ -45,4 +46,14 @@
     public boolean handleException(Throwable e) {
         return controller.handleException(e);
     }
+
+    @Override
+    public boolean pause() throws HyracksDataException {
+        return controller.pause();
+    }
+
+    @Override
+    public boolean resume() throws HyracksDataException {
+        return controller.pause();
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FileSystemFeedProgressTracker.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FileSystemFeedProgressTracker.java
new file mode 100644
index 0000000..7eccac5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FileSystemFeedProgressTracker.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feeds;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.log4j.Logger;
+
+public class FileSystemFeedProgressTracker {
+    private static final Logger LOGGER = Logger.getLogger(FileSystemFeedProgressTracker.class.getName());
+    private FeedLogManager logManager;
+    private ArrayList<String> completed;
+    private int recordNum;
+
+    public FileSystemFeedProgressTracker(FeedLogManager logManager) {
+        this.logManager = logManager;
+    }
+
+    public void init() throws IOException {
+        IOException exception = null;
+        String resumeFile = null;
+        HashSet<String> completed = new HashSet<String>();
+        if (logManager.exists()) {
+            BufferedReader reader = logManager.getReader(FeedLogManager.PROGRESS_LOG_FILE_NAME);
+            try {
+                String log = reader.readLine();
+                while (log != null) {
+                    switch (FeedLogManager.getLogType(log)) {
+                        case COMMIT:
+                            recordNum = Integer.parseInt(log);
+                            break;
+                        case END:
+                            resumeFile = null;
+                            completed.add(FeedLogManager.getSplitId(log));
+                            break;
+                        case START:
+                            resumeFile = FeedLogManager.getSplitId(log);
+                            break;
+                    }
+                    log = reader.readLine();
+                }
+            } catch (Throwable th) {
+                exception = new IOException(th);
+                LOGGER.error("Error reading log file", th);
+            }
+            try {
+                reader.close();
+            } catch (Throwable th) {
+                LOGGER.error("Open file couldn't be closed", th);
+                if (exception != null) {
+                    exception.addSuppressed(th);
+                } else {
+                    exception = new IOException(th);
+                }
+                throw exception;
+            }
+        } else {
+            logManager.create();
+            return;
+        }
+        /*
+         * Done processing the progress log file. We now have:
+         * 1. the files that were completed.
+         * 2. the file we stopped at and at which record did we stop.
+         * 
+         * First, we remove the files which were processed.
+         * Then we make sure the first file in the linked list is the file which we stopped at. 
+         * skipping processed records should be done by the upstream reader since this is format dependant 
+         */
+        /*
+        if (it == null)
+            return;
+        File startedFile = null;
+        while (it.hasNext()) {
+            File file = it.next();
+            if (completed.contains(file.getAbsolutePath())) {
+                it.remove();
+            } else if (file.getAbsolutePath().equals(resumeFile)) {
+                startedFile = file;
+                it.remove();
+            }
+        }
+        if (startedFile != null) {
+            files.addFirst(startedFile);
+        }
+        // reset the iterator
+        it = files.iterator();
+        */
+    }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IngestionRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IngestionRuntime.java
similarity index 70%
rename from asterix-common/src/main/java/org/apache/asterix/common/feeds/IngestionRuntime.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IngestionRuntime.java
index 926df39..eec79c0 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IngestionRuntime.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IngestionRuntime.java
@@ -16,11 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.feeds;
+package org.apache.asterix.external.feeds;
 
 import java.util.logging.Level;
 
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
+import org.apache.asterix.common.feeds.CollectionRuntime;
+import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
+import org.apache.asterix.common.feeds.FeedFrameCollector;
+import org.apache.asterix.common.feeds.FeedId;
+import org.apache.asterix.common.feeds.FeedPolicyAccessor;
+import org.apache.asterix.common.feeds.FeedRuntimeId;
+import org.apache.asterix.common.feeds.FrameDistributor;
+import org.apache.asterix.common.feeds.SubscribableRuntime;
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 
 public class IngestionRuntime extends SubscribableRuntime {
@@ -33,11 +41,12 @@
         this.adapterRuntimeManager = adaptorRuntimeManager;
     }
 
+    @Override
     public void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime) throws Exception {
         FeedFrameCollector reader = dWriter.subscribeFeed(fpa, collectionRuntime.getInputHandler(),
                 collectionRuntime.getConnectionId());
         collectionRuntime.setFrameCollector(reader);
-        
+
         if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.SINGLE)) {
             adapterRuntimeManager.start();
         }
@@ -47,7 +56,21 @@
         }
     }
 
+    @Override
     public void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception {
+        if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.SINGLE)) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Stopping adapter for " + this + " as no more registered collectors");
+            }
+            adapterRuntimeManager.stop();
+        } else {
+            dWriter.unsubscribeFeed(collectionRuntime.getInputHandler());
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Unsubscribed feed collection [" + collectionRuntime + "] from " + this);
+        }
+        subscribers.remove(collectionRuntime);
+        /*
         dWriter.unsubscribeFeed(collectionRuntime.getInputHandler());
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Unsubscribed feed collection [" + collectionRuntime + "] from " + this);
@@ -59,6 +82,7 @@
             adapterRuntimeManager.stop();
         }
         subscribers.remove(collectionRuntime);
+        */
     }
 
     public void endOfFeed() {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
index 3b59b98..93ba0a0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
@@ -37,6 +37,7 @@
     protected int bufferLength = 0;
     protected int bufferPosn = 0;
     protected IExternalIndexer indexer;
+    protected boolean done = false;
 
     @Override
     public IRawRecord<char[]> next() throws IOException {
@@ -45,7 +46,10 @@
 
     @Override
     public void close() throws IOException {
-        reader.close();
+        if (!done) {
+            reader.close();
+        }
+        done = true;
     }
 
     public void setInputStream(AInputStream inputStream) throws IOException {
@@ -72,4 +76,15 @@
     public void setIndexer(IExternalIndexer indexer) {
         this.indexer = indexer;
     }
+
+    @Override
+    public boolean stop() {
+        try {
+            reader.stop();
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
index 9b11df6..2b33d7a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
@@ -32,6 +32,9 @@
 
     @Override
     public boolean hasNext() throws IOException {
+        if (done) {
+            return false;
+        }
         /* We're reading data from in, but the head of the stream may be
          * already buffered in buffer, so we have several cases:
          * 1. No newline characters are in the buffer, so we need to copy
@@ -63,7 +66,7 @@
                         recordNumber++;
                         return true;
                     }
-                    reader.close();
+                    close();
                     return false; //EOF
                 }
             }
@@ -89,11 +92,6 @@
         } while (newlineLength == 0);
         recordNumber++;
         return true;
-    }
-
-    @Override
-    public boolean stop() {
-        return false;
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
index 668876e..49e67e9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
@@ -44,6 +44,9 @@
 
     @Override
     public boolean hasNext() throws IOException {
+        if (done) {
+            return false;
+        }
         newlineLength = 0;
         prevCharCR = false;
         prevCharEscape = false;
@@ -65,6 +68,7 @@
                             recordNumber++;
                             return true;
                         }
+                        close();
                         return false;
                     }
                 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
index 9864805..84c96d0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
@@ -67,6 +67,9 @@
 
     @Override
     public boolean hasNext() throws Exception {
+        if (done) {
+            return false;
+        }
         record.reset();
         boolean hasStarted = false;
         boolean hasFinished = false;
@@ -79,6 +82,7 @@
                 startPosn = bufferPosn = 0;
                 bufferLength = reader.read(inputBuffer);
                 if (bufferLength <= 0) {
+                    close();
                     return false; // EOF
                 }
             }
@@ -142,6 +146,12 @@
 
     @Override
     public boolean stop() {
-        return false;
+        try {
+            reader.stop();
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+        return true;
     }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
index e7c141d..6606950 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
@@ -38,11 +38,16 @@
     private LinkedBlockingQueue<Status> inputQ;
     private TwitterStream twitterStream;
     private GenericRecord<Status> record;
+    private boolean closed;
 
     @Override
     public void close() throws IOException {
-        twitterStream.clearListeners();
-        twitterStream.cleanUp();
+        if (!closed) {
+            twitterStream.clearListeners();
+            twitterStream.cleanUp();
+            twitterStream = null;
+            closed = true;
+        }
     }
 
     @Override
@@ -61,7 +66,7 @@
 
     @Override
     public boolean hasNext() throws Exception {
-        return true;
+        return !closed;
     }
 
     @Override
@@ -81,7 +86,12 @@
 
     @Override
     public boolean stop() {
-        return false;
+        try {
+            close();
+        } catch (Exception e) {
+            return false;
+        }
+        return true;
     }
 
     private class TweetListener implements StatusListener {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
index 72aaa37..6840c11 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
@@ -97,7 +97,7 @@
             pull = false;
         } else {
             throw new AsterixException("One of boolean parameters " + ExternalDataConstants.KEY_PULL + " and "
-                    + ExternalDataConstants.KEY_PUSH + "must be specified as part of adaptor configuration");
+                    + ExternalDataConstants.KEY_PUSH + " must be specified as part of adaptor configuration");
         }
     }
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
index e573f74..3dbef9a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.input.stream;
 
+import java.io.IOException;
 import java.io.InputStreamReader;
 
 public class AInputStreamReader extends InputStreamReader {
@@ -31,4 +32,16 @@
     public boolean skipError() throws Exception {
         return in.skipError();
     }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    public void stop() throws IOException {
+        try {
+            in.stop();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
index b3ad1c3..8f4c094 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
@@ -63,7 +63,7 @@
                 }
             } else if (value.getLength() == pos) {
                 pos++;
-                return ExternalDataConstants.EOL;
+                return ExternalDataConstants.BYTE_LF;
             }
             return value.getBytes()[pos++];
         }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
index b511617..7fc9a19 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
@@ -18,11 +18,8 @@
  */
 package org.apache.asterix.external.input.stream;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
+import java.nio.file.Path;
 import java.util.Map;
 
 import org.apache.asterix.external.api.IInputStreamProvider;
@@ -31,26 +28,23 @@
 
 public class LocalFSInputStreamProvider implements IInputStreamProvider {
 
-    private FileSplit[] fileSplits;
-    private int partition;
+    private String expression;
+    private boolean isFeed;
+    private Path path;
 
     public LocalFSInputStreamProvider(FileSplit[] fileSplits, IHyracksTaskContext ctx,
-            Map<String, String> configuration, int partition) {
-        this.partition = partition;
-        this.fileSplits = fileSplits;
+            Map<String, String> configuration, int partition, String expression, boolean isFeed) {
+        this.expression = expression;
+        this.isFeed = isFeed;
+        this.path = fileSplits[partition].getLocalFile().getFile().toPath();
     }
 
     @Override
     public AInputStream getInputStream() throws Exception {
-        FileSplit split = fileSplits[partition];
-        File inputFile = split.getLocalFile().getFile();
-        InputStream in;
         try {
-            in = new FileInputStream(inputFile);
-            return new BasicInputStream(in);
-        } catch (FileNotFoundException e) {
+            return new LocalFileSystemInputStream(path, expression, null, isFeed);
+        } catch (Exception e) {
             throw new IOException(e);
         }
     }
-
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
new file mode 100644
index 0000000..2076e76
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.FileSystemWatcher;
+
+public class LocalFileSystemInputStream extends AInputStream {
+    private final FileSystemWatcher watcher;
+    private FileInputStream in;
+    private byte lastByte;
+
+    public LocalFileSystemInputStream(Path inputResource, String expression, FeedLogManager logManager, boolean isFeed)
+            throws IOException {
+        this.watcher = new FileSystemWatcher(logManager, inputResource, expression, isFeed);
+        this.watcher.init();
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOException ioe = null;
+        if (in != null) {
+            try {
+                closeFile();
+            } catch (Exception e) {
+                ioe = new IOException(e);
+            }
+        }
+        try {
+            watcher.close();
+        } catch (Exception e) {
+            if (ioe == null) {
+                throw e;
+            }
+            ioe.addSuppressed(e);
+            throw ioe;
+        }
+    }
+
+    private void closeFile() throws IOException {
+        if (in != null) {
+            try {
+                in.close();
+            } finally {
+                in = null;
+            }
+        }
+    }
+
+    /**
+     * Closes the current input stream and opens the next one, if any.
+     */
+    private boolean advance() throws IOException {
+        closeFile();
+        if (watcher.hasNext()) {
+            in = new FileInputStream(watcher.next());
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (in == null) {
+            if (!advance()) {
+                return -1;
+            }
+        }
+        int result = in.read();
+        while (result < 0 && advance()) {
+            // return a new line at the end of every file <--Might create problems for some cases depending on the parser implementation-->
+            if (lastByte != ExternalDataConstants.BYTE_LF && lastByte != ExternalDataConstants.BYTE_LF) {
+                lastByte = ExternalDataConstants.BYTE_LF;
+                return ExternalDataConstants.BYTE_LF;
+            }
+            result = in.read();
+        }
+        lastByte = (byte) result;
+        return result;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (in == null) {
+            if (!advance()) {
+                return -1;
+            }
+        }
+        int result = in.read(b, off, len);
+        while (result < 0 && advance()) {
+            // return a new line at the end of every file <--Might create problems for some cases depending on the parser implementation-->
+            if (lastByte != ExternalDataConstants.BYTE_LF && lastByte != ExternalDataConstants.BYTE_LF) {
+                lastByte = ExternalDataConstants.BYTE_LF;
+                b[off] = ExternalDataConstants.BYTE_LF;
+                return 1;
+            }
+            // recursive call
+            result = in.read(b, off, len);
+        }
+        if (result > 0) {
+            lastByte = b[off + result - 1];
+        }
+        return result;
+    }
+
+    @Override
+    public boolean skipError() throws Exception {
+        advance();
+        return true;
+    }
+
+    @Override
+    public boolean stop() throws Exception {
+        watcher.close();
+        return true;
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
index 14c712a..7a740ec 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -29,8 +29,9 @@
 import org.apache.asterix.external.api.INodeResolver;
 import org.apache.asterix.external.api.INodeResolverFactory;
 import org.apache.asterix.external.input.stream.LocalFSInputStreamProvider;
-import org.apache.asterix.external.util.DNSResolverFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.NodeResolverFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -41,15 +42,17 @@
 
     private static final long serialVersionUID = 1L;
 
-    protected static final INodeResolver DEFAULT_NODE_RESOLVER = new DNSResolverFactory().createNodeResolver();
+    protected static final INodeResolver DEFAULT_NODE_RESOLVER = new NodeResolverFactory().createNodeResolver();
     protected static final Logger LOGGER = Logger.getLogger(LocalFSInputStreamProviderFactory.class.getName());
     protected static INodeResolver nodeResolver;
     protected Map<String, String> configuration;
     protected FileSplit[] fileSplits;
+    protected boolean isFeed;
+    protected String expression;
 
     @Override
     public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) throws Exception {
-        return new LocalFSInputStreamProvider(fileSplits, ctx, configuration, partition);
+        return new LocalFSInputStreamProvider(fileSplits, ctx, configuration, partition, expression, isFeed);
     }
 
     @Override
@@ -67,6 +70,8 @@
         this.configuration = configuration;
         String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(",");
         configureFileSplits(splits);
+        this.isFeed = ExternalDataUtils.isFeed(configuration) && ExternalDataUtils.waitForData(configuration);
+        this.expression = configuration.get(ExternalDataConstants.KEY_EXPRESSION);
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
index 860d35f..129b62f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
@@ -21,6 +21,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
@@ -1145,4 +1146,10 @@
         recordBuilderPool.reset();
         abvsBuilderPool.reset();
     }
+
+    @Override
+    public boolean reset(InputStream in) throws IOException {
+        admLexer.reInit(new InputStreamReader(in));
+        return true;
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 146064a..6c399c3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -198,11 +198,17 @@
     }
 
     @Override
-    public void setInputStream(InputStream in) throws Exception {
+    public void setInputStream(InputStream in) throws IOException {
         cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
         if (in != null && hasHeader) {
             cursor.nextRecord();
             while (cursor.nextField());
         }
     }
+
+    @Override
+    public boolean reset(InputStream in) throws IOException {
+        cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
+        return true;
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index 649ca43..373804a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -36,6 +36,7 @@
 import org.apache.asterix.external.runtime.SocketClientAdapterFactory;
 import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
 
@@ -57,6 +58,9 @@
         adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_ADAPTER, GenericSocketFeedAdapterFactory.class);
         adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER, SocketClientAdapterFactory.class);
         adapterFactories.put(ExternalDataConstants.ALIAS_FILE_FEED_ADAPTER, GenericAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_TWITTER_PULL_ADAPTER, GenericAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_TWITTER_PUSH_ADAPTER, GenericAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_LOCALFS_PUSH_ADAPTER, GenericAdapterFactory.class);
 
         // Compatability
         adapterFactories.put(ExternalDataConstants.ADAPTER_HDFS_CLASSNAME, GenericAdapterFactory.class);
@@ -66,11 +70,19 @@
 
     public static IAdapterFactory getAdapterFactory(String adapterClassname, Map<String, String> configuration,
             ARecordType itemType) throws Exception {
+        return AdapterFactoryProvider.getAdapterFactory(adapterClassname, configuration, itemType, false);
+    }
+
+    public static IAdapterFactory getAdapterFactory(String adapterClassname, Map<String, String> configuration,
+            ARecordType itemType, boolean isFeed) throws Exception {
         ExternalDataCompatibilityUtils.addCompatabilityParameters(adapterClassname, itemType, configuration);
         if (!adapterFactories.containsKey(adapterClassname)) {
             throw new AsterixException("Unknown adapter: " + adapterClassname);
         }
         IAdapterFactory adapterFactory = adapterFactories.get(adapterClassname).newInstance();
+        if (isFeed) {
+            ExternalDataUtils.prepareFeed(configuration);
+        }
         adapterFactory.configure(configuration, itemType);
         return adapterFactory;
     }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 68a3942..dfe7aed 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -28,14 +28,19 @@
 import org.apache.asterix.external.api.IInputStreamProviderFactory;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.api.IRecordFlowController;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.api.IStreamDataParser;
 import org.apache.asterix.external.api.IStreamDataParserFactory;
+import org.apache.asterix.external.api.IStreamFlowController;
+import org.apache.asterix.external.dataflow.FeedRecordDataFlowController;
+import org.apache.asterix.external.dataflow.FeedStreamDataFlowController;
 import org.apache.asterix.external.dataflow.IndexingDataFlowController;
 import org.apache.asterix.external.dataflow.RecordDataFlowController;
 import org.apache.asterix.external.dataflow.StreamDataFlowController;
 import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
@@ -60,9 +65,11 @@
             Map<String, String> configuration, boolean indexingOp) throws Exception {
         switch (dataSourceFactory.getDataSourceType()) {
             case RECORDS:
-                RecordDataFlowController recordDataFlowController;
+                IRecordFlowController recordDataFlowController = null;
                 if (indexingOp) {
                     recordDataFlowController = new IndexingDataFlowController();
+                } else if (ExternalDataUtils.isFeed(configuration)) {
+                    recordDataFlowController = new FeedRecordDataFlowController();
                 } else {
                     recordDataFlowController = new RecordDataFlowController();
                 }
@@ -77,7 +84,12 @@
                 recordDataFlowController.setRecordParser(dataParser);
                 return recordDataFlowController;
             case STREAM:
-                StreamDataFlowController streamDataFlowController = new StreamDataFlowController();
+                IStreamFlowController streamDataFlowController = null;
+                if (ExternalDataUtils.isFeed(configuration)) {
+                    streamDataFlowController = new FeedStreamDataFlowController();
+                } else {
+                    streamDataFlowController = new StreamDataFlowController();
+                }
                 streamDataFlowController.configure(configuration, ctx);
                 streamDataFlowController.setTupleForwarder(DataflowUtils.getTupleForwarder(configuration));
                 IInputStreamProviderFactory streamProviderFactory = (IInputStreamProviderFactory) dataSourceFactory;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index c69e12c..a7ab062 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
 import org.apache.asterix.external.input.record.reader.factory.LineRecordReaderFactory;
 import org.apache.asterix.external.input.record.reader.factory.SemiStructuredRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.factory.TwitterRecordReaderFactory;
 import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamProviderFactory;
 import org.apache.asterix.external.input.stream.factory.SocketInputStreamProviderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -92,8 +93,12 @@
                             .setInputStreamFactoryProvider(DatasourceFactoryProvider.getInputStreamFactory(
                                     ExternalDataUtils.getRecordReaderStreamName(configuration), configuration));;
                     break;
+                case ExternalDataConstants.READER_TWITTER_PULL:
+                case ExternalDataConstants.READER_TWITTER_PUSH:
+                    readerFactory = new TwitterRecordReaderFactory();
+                    break;
                 default:
-                    throw new AsterixException("unknown input stream factory");
+                    throw new AsterixException("unknown record reader factory");
             }
         }
         return readerFactory;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/GenericSocketFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/GenericSocketFeedAdapter.java
index dcf3b51..3a570d8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/GenericSocketFeedAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/GenericSocketFeedAdapter.java
@@ -25,13 +25,14 @@
 import java.util.logging.Level;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.asterix.external.dataset.adapter.StreamBasedAdapter;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
 
-public class GenericSocketFeedAdapter extends StreamBasedAdapter {
+public class GenericSocketFeedAdapter extends StreamBasedAdapter implements IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
 
@@ -115,4 +116,16 @@
             return false;
         }
     }
+
+    @Override
+    public boolean pause() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean resume() {
+        // TODO Auto-generated method stub
+        return false;
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
index db38c12..a1c2a73 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
@@ -25,10 +25,10 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.hyracks.api.comm.IFrameWriter;
 
-public class SocketClientAdapter implements IDataSourceAdapter {
+public class SocketClientAdapter implements IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
 
@@ -103,4 +103,16 @@
         return false;
     }
 
+    @Override
+    public boolean pause() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean resume() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
index ea13f25..fa4ec5d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.common.parse.ITupleForwarder;
 import org.apache.asterix.common.parse.ITupleForwarder.TupleForwardPolicy;
 import org.apache.asterix.external.dataflow.CounterTimerTupleForwarder;
+import org.apache.asterix.external.dataflow.FeedTupleForwarder;
 import org.apache.asterix.external.dataflow.FrameFullTupleForwarder;
 import org.apache.asterix.external.dataflow.RateControlledTupleForwarder;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -46,12 +47,17 @@
         ITupleForwarder policy = null;
         ITupleForwarder.TupleForwardPolicy policyType = null;
         String propValue = configuration.get(ITupleForwarder.FORWARD_POLICY);
-        if (propValue == null) {
+        if (ExternalDataUtils.isFeed(configuration)) {
+            policyType = TupleForwardPolicy.FEED;
+        } else if (propValue == null) {
             policyType = TupleForwardPolicy.FRAME_FULL;
         } else {
             policyType = TupleForwardPolicy.valueOf(propValue.trim().toUpperCase());
         }
         switch (policyType) {
+            case FEED:
+                policy = new FeedTupleForwarder();
+                break;
             case FRAME_FULL:
                 policy = new FrameFullTupleForwarder();
                 break;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
index a544638..17b194c 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
@@ -22,7 +22,6 @@
 
     /*
         The following assumes this DDL (but ignoring the field name orders):
-
         create type TwitterUser if not exists as open{
             screen_name: string,
             language: string,
@@ -31,7 +30,6 @@
             name: string,
             followers_count: int32
         };
-
         create type Tweet if not exists as open{
             id: string,
             user: TwitterUser,
@@ -40,7 +38,6 @@
             created_at:string,
             message_text:string
         };
-
     */
     public static class Tweet {
         public static final String ID = "id";
@@ -62,10 +59,8 @@
 
     }
 
-
     /*
         The following assumes this DDL (but ignoring the field name orders):
-
         create type ProcessedTweet if not exists as open {
             id: string,
             user_name:string,
@@ -75,7 +70,6 @@
             country: string,
             topics: [string]
         };
-
     */
     public static final class ProcessedTweet {
         public static final String USER_NAME = "user_name";
@@ -83,5 +77,4 @@
         public static final String TOPICS = "topics";
     }
 
-
-}
+}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
index 7f91a2b..c785ab6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
@@ -51,6 +51,7 @@
     //TODO:Add remaining aliases
     public static void addCompatabilityParameters(String adapterClassname, ARecordType itemType,
             Map<String, String> configuration) throws AsterixException {
+        // HDFS
         if (adapterClassname.equals(ExternalDataConstants.ALIAS_HDFS_ADAPTER)
                 || adapterClassname.equalsIgnoreCase(ExternalDataConstants.ADAPTER_HDFS_CLASSNAME)) {
             if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
@@ -65,21 +66,45 @@
                 configuration.put(ExternalDataConstants.KEY_READER_STREAM, ExternalDataConstants.ALIAS_HDFS_ADAPTER);
             }
         }
+
+        // Local Filesystem
         if (adapterClassname.equals(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER)
-                || adapterClassname.contains(ExternalDataConstants.ADAPTER_LOCALFS_CLASSNAME)) {
+                || adapterClassname.contains(ExternalDataConstants.ADAPTER_LOCALFS_CLASSNAME)
+                || adapterClassname.contains(ExternalDataConstants.ALIAS_LOCALFS_PUSH_ADAPTER)) {
             if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
                 throw new AsterixException("Unspecified format parameter for local file system adapter");
             }
             configuration.put(ExternalDataConstants.KEY_READER, configuration.get(ExternalDataConstants.KEY_FORMAT));
             configuration.put(ExternalDataConstants.KEY_READER_STREAM, ExternalDataConstants.ALIAS_LOCALFS_ADAPTER);
         }
+
+        // Twitter (Pull)
+        if (adapterClassname.equals(ExternalDataConstants.ALIAS_TWITTER_PULL_ADAPTER)) {
+            configuration.put(ExternalDataConstants.KEY_READER, ExternalDataConstants.READER_TWITTER_PULL);
+            configuration.put(ExternalDataConstants.KEY_PULL, ExternalDataConstants.TRUE);
+            ExternalDataUtils.setRecordFormat(configuration, ExternalDataConstants.FORMAT_TWEET);
+        }
+
+        // Twitter (Push)
+        if (adapterClassname.equals(ExternalDataConstants.ALIAS_TWITTER_PUSH_ADAPTER)) {
+            configuration.put(ExternalDataConstants.KEY_READER, ExternalDataConstants.READER_TWITTER_PUSH);
+            configuration.put(ExternalDataConstants.KEY_PUSH, ExternalDataConstants.TRUE);
+            ExternalDataUtils.setRecordFormat(configuration, ExternalDataConstants.FORMAT_TWEET);
+        }
+
+        // Hive Parser
         if (configuration.get(ExternalDataConstants.KEY_PARSER) != null
                 && configuration.get(ExternalDataConstants.KEY_PARSER).equals(ExternalDataConstants.PARSER_HIVE)) {
             configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_HIVE);
         }
+
+        // FileSystem for Feed adapter
         if (configuration.get(ExternalDataConstants.KEY_FILESYSTEM) != null) {
             configuration.put(ExternalDataConstants.KEY_STREAM,
                     configuration.get(ExternalDataConstants.KEY_FILESYSTEM));
+            if (adapterClassname.equalsIgnoreCase(ExternalDataConstants.ALIAS_FILE_FEED_ADAPTER)) {
+                configuration.put(ExternalDataConstants.KEY_WAIT_FOR_DATA, ExternalDataConstants.FALSE);
+            }
         }
     }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 2050e6a..cf6a608 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -70,6 +70,8 @@
     public static final String KEY_INTERVAL = "interval";
     public static final String KEY_PULL = "pull";
     public static final String KEY_PUSH = "push";
+    public static final String KEY_IS_FEED = "is-feed";
+    public static final String KEY_WAIT_FOR_DATA = "wait-for-data";
     /**
      * HDFS class names
      */
@@ -94,6 +96,8 @@
     public static final String READER_ADM = "adm";
     public static final String READER_SEMISTRUCTURED = "semi-structured";
     public static final String READER_DELIMITED = "delimited-text";
+    public static final String READER_TWITTER_PUSH = "twitter-push";
+    public static final String READER_TWITTER_PULL = "twitter-pull";
 
     public static final String CLUSTER_LOCATIONS = "cluster-locations";
     public static final String SCHEDULER = "hdfs-scheduler";
@@ -128,6 +132,7 @@
      */
     public static final String ALIAS_GENERIC_ADAPTER = "adapter";
     public static final String ALIAS_LOCALFS_ADAPTER = "localfs";
+    public static final String ALIAS_LOCALFS_PUSH_ADAPTER = "push_localfs";
     public static final String ALIAS_HDFS_ADAPTER = "hdfs";
     public static final String ALIAS_SOCKET_ADAPTER = "socket_adapter";
     public static final String ALIAS_TWITTER_FIREHOSE_ADAPTER = "twitter_firehose";
@@ -136,7 +141,6 @@
     public static final String ALIAS_FILE_FEED_ADAPTER = "file_feed";
     public static final String ALIAS_TWITTER_PUSH_ADAPTER = "push_twitter";
     public static final String ALIAS_TWITTER_PULL_ADAPTER = "pull_twitter";
-    public static final String ALIAS_TWITTER_AZURE_ADAPTER = "azure_twitter";
     public static final String ALIAS_CNN_ADAPTER = "cnn_feed";
 
     /**
@@ -144,6 +148,12 @@
      */
     public static final String ADAPTER_LOCALFS_CLASSNAME = "org.apache.asterix.external.dataset.adapter.NCFileSystemAdapter";
     public static final String ADAPTER_HDFS_CLASSNAME = "org.apache.asterix.external.dataset.adapter.HDFSAdapter";
+
+    /**
+     * Constant String values
+     */
+    public static final String TRUE = "true";
+    public static final String FALSE = "false";
 
     /**
      * Constant characters
@@ -160,7 +170,7 @@
     /**
      * Constant byte characters
      */
-    public static final byte EOL = '\n';
+    public static final byte BYTE_LF = '\n';
     public static final byte BYTE_CR = '\r';
     /**
      * Size default values
@@ -172,5 +182,4 @@
      * Expected parameter values
      */
     public static final String PARAMETER_OF_SIZE_ONE = "Value of size 1";
-
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
index 9dcaef4..e77e121 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.external.util;
 
+import java.util.Arrays;
+
 public class ExternalDataExceptionUtils {
     public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
     public static final String MISSING_PARAMETER = "Missing parameter.\n";
@@ -29,4 +31,8 @@
         return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + ExternalDataConstants.LF + EXPECTED_VALUE
                 + expectedValue + ExternalDataConstants.LF + PASSED_VALUE + passedValue;
     }
+
+    public static String concat(String... vals) {
+        return Arrays.toString(vals);
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 7c1c1b5..a545930 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -134,6 +134,15 @@
         return parserFormat != null ? parserFormat : configuration.get(ExternalDataConstants.KEY_FORMAT);
     }
 
+    public static void setRecordFormat(Map<String, String> configuration, String format) {
+        if (!configuration.containsKey(ExternalDataConstants.KEY_DATA_PARSER)) {
+            configuration.put(ExternalDataConstants.KEY_DATA_PARSER, format);
+        }
+        if (!configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
+            configuration.put(ExternalDataConstants.KEY_FORMAT, format);
+        }
+    }
+
     private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
 
     private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
@@ -219,4 +228,35 @@
                         .substring(parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + 1))
                 .newInstance();
     }
+
+    public static boolean isFeed(Map<String, String> configuration) {
+        if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) {
+            return false;
+        } else {
+            return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_FEED));
+        }
+    }
+
+    public static void prepareFeed(Map<String, String> configuration) throws AsterixException {
+        if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) {
+            configuration.put(ExternalDataConstants.KEY_IS_FEED, ExternalDataConstants.TRUE);
+        } else {
+            if (!(configuration.get(ExternalDataConstants.KEY_IS_FEED).equalsIgnoreCase(ExternalDataConstants.TRUE)
+                    || configuration.get(ExternalDataConstants.KEY_IS_FEED)
+                            .equalsIgnoreCase(ExternalDataConstants.FALSE))) {
+                throw new AsterixException(
+                        ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_IS_FEED,
+                                ExternalDataExceptionUtils.concat(ExternalDataConstants.TRUE,
+                                        ExternalDataConstants.FALSE),
+                        configuration.get(ExternalDataConstants.KEY_IS_FEED)));
+            }
+        }
+    }
+
+    public static boolean waitForData(Map<String, String> configuration) {
+        if (!configuration.containsKey(ExternalDataConstants.KEY_WAIT_FOR_DATA)) {
+            return true;
+        }
+        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_WAIT_FOR_DATA));
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
new file mode 100644
index 0000000..02df398
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+
+public class FeedLogManager {
+
+    public enum LogEntryType {
+        START,
+        END,
+        COMMIT
+    }
+
+    public static final String PROGRESS_LOG_FILE_NAME = "progress.log";
+    public static final String ERROR_LOG_FILE_NAME = "error.log";
+    public static final String FAULTY_RECORDS_FILE_NAME = "bad_records.log";
+    public static final String START_PREFIX = "s:";
+    public static final String END_PREFIX = "e:";
+    public static final int PREFIX_SIZE = 2;
+    private long recNumber;
+
+    public boolean exists() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    public BufferedReader getReader(String progressLogFileName) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public void create() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void setSplit(String split) {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void log(String string, IOException e) {
+        // TODO Auto-generated method stub
+
+    }
+
+    public long getRecNumber() {
+        return recNumber;
+    }
+
+    public void setRecNumber(long recNumber) {
+        this.recNumber = recNumber;
+    }
+
+    /**
+     * Static methods from here on
+     */
+    public static LogEntryType getLogType(String log) {
+        if (log.startsWith(START_PREFIX)) {
+            return LogEntryType.START;
+        } else if (log.startsWith(END_PREFIX)) {
+            return LogEntryType.END;
+        } else {
+            return LogEntryType.COMMIT;
+        }
+    }
+
+    public static String getSplitId(String log) {
+        return log.substring(PREFIX_SIZE);
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
similarity index 67%
copy from asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
copy to asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index f8585bb..12a98ed 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -18,19 +18,6 @@
  */
 package org.apache.asterix.external.util;
 
-import org.apache.asterix.external.api.INodeResolver;
-import org.apache.asterix.external.api.INodeResolverFactory;
-
-/**
- * Factory for creating instance of {@link NodeResolver}
- */
-public class DNSResolverFactory implements INodeResolverFactory {
-
-    private static final INodeResolver INSTANCE = new NodeResolver();
-
-    @Override
-    public INodeResolver createNodeResolver() {
-        return INSTANCE;
-    }
+public class FeedUtils {
 
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
new file mode 100644
index 0000000..fc862fa
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchEvent.Kind;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class FileSystemWatcher {
+
+    private static Logger LOGGER = Logger.getLogger(FileSystemWatcher.class.getName());
+    private final WatchService watcher;
+    private final HashMap<WatchKey, Path> keys;
+    private final LinkedList<File> files = new LinkedList<File>();
+    private Iterator<File> it;
+    private final String expression;
+    private final FeedLogManager logManager;
+    private final Path path;
+    private final boolean isFeed;
+    private boolean done;
+
+    public FileSystemWatcher(FeedLogManager logManager, Path inputResource, String expression, boolean isFeed)
+            throws IOException {
+        this.watcher = isFeed ? FileSystems.getDefault().newWatchService() : null;
+        this.keys = isFeed ? new HashMap<WatchKey, Path>() : null;
+        this.logManager = logManager;
+        this.expression = expression;
+        this.path = inputResource;
+        this.isFeed = isFeed;
+    }
+
+    public void init() throws IOException {
+        LinkedList<Path> dirs = null;
+        dirs = new LinkedList<Path>();
+        LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs);
+        it = files.iterator();
+        if (isFeed) {
+            for (Path path : dirs) {
+                register(path);
+            }
+            resume();
+        }
+    }
+
+    /**
+     * Register the given directory, and all its sub-directories, with the
+     * WatchService.
+     */
+    private void register(Path dir) throws IOException {
+        WatchKey key = dir.register(watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
+                StandardWatchEventKinds.ENTRY_MODIFY);
+        keys.put(key, dir);
+    }
+
+    private void resume() throws IOException {
+        if (logManager == null) {
+            return;
+        }
+        IOException exception = null;
+        // Get log reader
+        String resumeFile = null;
+        HashSet<String> completed = new HashSet<String>();
+        if (logManager.exists()) {
+            BufferedReader reader = logManager.getReader(FeedLogManager.PROGRESS_LOG_FILE_NAME);
+            long recordNum = 0L;
+            try {
+                String log = reader.readLine();
+                while (log != null) {
+                    switch (FeedLogManager.getLogType(log)) {
+                        case COMMIT:
+                            recordNum = Long.parseLong(log);
+                            break;
+                        case END:
+                            resumeFile = null;
+                            completed.add(FeedLogManager.getSplitId(log));
+                            break;
+                        case START:
+                            resumeFile = FeedLogManager.getSplitId(log);
+                            break;
+                    }
+                    log = reader.readLine();
+                }
+            } catch (Throwable th) {
+                exception = new IOException(th);
+            }
+            try {
+                logManager.setRecNumber(recordNum);
+                reader.close();
+            } catch (Throwable th) {
+                if (exception != null) {
+                    exception.addSuppressed(th);
+                } else {
+                    exception = new IOException(th);
+                }
+                throw exception;
+            }
+        } else {
+            logManager.create();
+            return;
+        }
+        /*
+         * Done processing the progress log file. We now have:
+         * 1. the files that were completed.
+         * 2. the file we stopped at and at which record did we stop.
+         * 
+         * First, we remove the files which were processed.
+         * Then we make sure the first file in the linked list is the file which we stopped at. 
+         * skipping processed records should be done by the upstream reader since this is format dependant 
+         */
+
+        if (it == null)
+            return;
+        File startedFile = null;
+        while (it.hasNext()) {
+            File file = it.next();
+            if (completed.contains(file.getAbsolutePath())) {
+                // File was read completely, remove it from the files list
+                it.remove();
+            } else if (file.getAbsolutePath().equals(resumeFile)) {
+                startedFile = file;
+                it.remove();
+            }
+        }
+        if (startedFile != null) {
+            files.addFirst(startedFile);
+        }
+        // reset the iterator
+        it = files.iterator();
+    }
+
+    @SuppressWarnings("unchecked")
+    static <T> WatchEvent<T> cast(WatchEvent<?> event) {
+        return (WatchEvent<T>) event;
+    }
+
+    private void handleEvents(WatchKey key) {
+        // get dir associated with the key
+        Path dir = keys.get(key);
+        if (dir == null) {
+            // This should never happen
+            if (LOGGER.isEnabledFor(Level.WARN)) {
+                LOGGER.warn("WatchKey not recognized!!");
+            }
+            return;
+        }
+        for (WatchEvent<?> event : key.pollEvents()) {
+            Kind<?> kind = event.kind();
+            // TODO: Do something about overflow events
+            // An overflow event means that some events were dropped
+            if (kind == StandardWatchEventKinds.OVERFLOW) {
+                if (LOGGER.isEnabledFor(Level.WARN)) {
+                    LOGGER.warn("Overflow event. Some events might have been missed");
+                }
+                continue;
+            }
+
+            // Context for directory entry event is the file name of entry
+            WatchEvent<Path> ev = cast(event);
+            Path name = ev.context();
+            Path child = dir.resolve(name);
+            // if directory is created then register it and its sub-directories
+            if ((kind == StandardWatchEventKinds.ENTRY_CREATE)) {
+                try {
+                    if (Files.isDirectory(child, LinkOption.NOFOLLOW_LINKS)) {
+                        register(child);
+                    } else {
+                        // it is a file, add it to the files list.
+                        LocalFileSystemUtils.validateAndAdd(child, expression, files);
+                    }
+                } catch (IOException e) {
+                    if (LOGGER.isEnabledFor(Level.ERROR)) {
+                        LOGGER.error(e);
+                    }
+                }
+            }
+        }
+    }
+
+    public void close() throws IOException {
+        if (!done) {
+            if (watcher != null) {
+                watcher.close();
+            }
+            done = true;
+        }
+    }
+
+    public File next() {
+        return it.next();
+    }
+
+    private boolean endOfEvents(WatchKey key) {
+        // reset key and remove from set if directory no longer accessible
+        if (!key.reset()) {
+            keys.remove(key);
+            if (keys.isEmpty()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean hasNext() {
+        if (it.hasNext()) {
+            return true;
+        }
+        if (done || !isFeed) {
+            return false;
+        }
+        files.clear();
+        // Read new Events (Polling first to add all available files)
+        WatchKey key;
+        key = watcher.poll();
+        while (key != null) {
+            handleEvents(key);
+            if (endOfEvents(key)) {
+                return false;
+            }
+            key = watcher.poll();
+        }
+        // No file was found, wait for the filesystem to push events
+        while (files.isEmpty()) {
+            try {
+                key = watcher.take();
+            } catch (InterruptedException x) {
+                if (LOGGER.isEnabledFor(Level.WARN)) {
+                    LOGGER.warn("Feed Closed");
+                }
+                return false;
+            } catch (ClosedWatchServiceException e) {
+                if (LOGGER.isEnabledFor(Level.WARN)) {
+                    LOGGER.warn("The watcher has exited");
+                }
+                return false;
+            }
+            handleEvents(key);
+            if (endOfEvents(key)) {
+                return false;
+            }
+        }
+        // files were found, re-create the iterator and move it one step
+        it = files.iterator();
+        return it.hasNext();
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
new file mode 100644
index 0000000..0aa01b1
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.LinkedList;
+import java.util.regex.Pattern;
+
+public class LocalFileSystemUtils {
+    public static void traverse(final LinkedList<File> files, File root, final String expression,
+            final LinkedList<Path> dirs) throws IOException {
+        if (!Files.exists(root.toPath())) {
+            return;
+        }
+        if (!Files.isDirectory(root.toPath())) {
+            validateAndAdd(root.toPath(), expression, files);
+        }
+        Files.walkFileTree(root.toPath(), new SimpleFileVisitor<Path>() {
+            @Override
+            public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs) throws IOException {
+                if (!Files.exists(path, LinkOption.NOFOLLOW_LINKS)) {
+                    return FileVisitResult.TERMINATE;
+                }
+                if (Files.isDirectory(path, LinkOption.NOFOLLOW_LINKS)) {
+                    if (dirs != null) {
+                        dirs.add(path);
+                    }
+                    //get immediate children files
+                    File[] content = path.toFile().listFiles();
+                    for (File file : content) {
+                        if (!file.isDirectory()) {
+                            validateAndAdd(file.toPath(), expression, files);
+                        }
+                    }
+                } else {
+                    // Path is a file, add to list of files if it matches the expression
+                    validateAndAdd(path, expression, files);
+                }
+                return FileVisitResult.CONTINUE;
+            }
+        });
+    }
+
+    public static void validateAndAdd(Path path, String expression, LinkedList<File> files) {
+        if (expression == null || Pattern.matches(expression, path.toString())) {
+            files.add(new File(path.toString()));
+        }
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolverFactory.java
similarity index 94%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolverFactory.java
index f8585bb..b62dda8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolverFactory.java
@@ -24,7 +24,7 @@
 /**
  * Factory for creating instance of {@link NodeResolver}
  */
-public class DNSResolverFactory implements INodeResolverFactory {
+public class NodeResolverFactory implements INodeResolverFactory {
 
     private static final INodeResolver INSTANCE = new NodeResolver();
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
deleted file mode 100644
index f8914a6..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.util;
-
-import org.apache.asterix.external.library.java.JObjectUtil;
-import org.apache.asterix.external.util.Datatypes.Tweet;
-import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableRecord;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import twitter4j.Status;
-import twitter4j.User;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class TweetProcessor {
-
-    private IAObject[] mutableTweetFields;
-    private IAObject[] mutableUserFields;
-    private AMutableRecord mutableRecord;
-    private AMutableRecord mutableUser;
-
-    private final Map<String, Integer> userFieldNameMap = new HashMap<>();
-    private final Map<String, Integer> tweetFieldNameMap = new HashMap<>();
-
-
-    public TweetProcessor(ARecordType recordType) {
-        initFieldNames(recordType);
-        mutableUserFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableInt32(0),
-                new AMutableInt32(0), new AMutableString(null), new AMutableInt32(0) };
-        mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[tweetFieldNameMap.get(Tweet.USER)], mutableUserFields);
-
-        mutableTweetFields = new IAObject[] { new AMutableString(null), mutableUser, new AMutableDouble(0),
-                new AMutableDouble(0), new AMutableString(null), new AMutableString(null) };
-        mutableRecord = new AMutableRecord(recordType, mutableTweetFields);
-
-    }
-
-    // Initialize the hashmap values for the field names and positions
-    private void initFieldNames(ARecordType recordType) {
-        String tweetFields[] = recordType.getFieldNames();
-        for (int i=0; i<tweetFields.length; i++) {
-            tweetFieldNameMap.put(tweetFields[i], i);
-            if (tweetFields[i].equals(Tweet.USER)) {
-                IAType fieldType = recordType.getFieldTypes()[i];
-                if (fieldType.getTypeTag() == ATypeTag.RECORD) {
-                    String userFields[]  = ((ARecordType)fieldType).getFieldNames();
-                    for (int j=0; j<userFields.length; j++) {
-                        userFieldNameMap.put(userFields[j], j);
-                    }
-                }
-
-            }
-        }
-    }
-
-
-    public AMutableRecord processNextTweet(Status tweet) {
-        User user = tweet.getUser();
-
-        // Tweet user data
-        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.SCREEN_NAME)]).setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
-        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.LANGUAGE)]).setValue(JObjectUtil.getNormalizedString(user.getLang()));
-        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FRIENDS_COUNT)]).setValue(user.getFriendsCount());
-        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.STATUS_COUNT)]).setValue(user.getStatusesCount());
-        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.NAME)]).setValue(JObjectUtil.getNormalizedString(user.getName()));
-        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FOLLOWERS_COUNT)]).setValue(user.getFollowersCount());
-
-
-        // Tweet data
-        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.ID)]).setValue(String.valueOf(tweet.getId()));
-
-        int userPos = tweetFieldNameMap.get(Tweet.USER);
-        for (int i = 0; i < mutableUserFields.length; i++) {
-            ((AMutableRecord) mutableTweetFields[userPos]).setValueAtPos(i, mutableUserFields[i]);
-        }
-        if (tweet.getGeoLocation() != null) {
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(tweet.getGeoLocation().getLatitude());
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(tweet.getGeoLocation().getLongitude());
-        } else {
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(0);
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(0);
-        }
-        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.CREATED_AT)]).setValue(JObjectUtil.getNormalizedString(
-                tweet.getCreatedAt().toString()));
-        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.MESSAGE)]).setValue(JObjectUtil.getNormalizedString(tweet.getText()));
-
-        for (int i = 0; i < mutableTweetFields.length; i++) {
-            mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
-        }
-
-        return mutableRecord;
-
-    }
-
-    public AMutableRecord getMutableRecord() {
-        return mutableRecord;
-    }
-
-}
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index df0fb94..441b471 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -27,13 +27,14 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.asterix.external.dataset.adapter.StreamBasedAdapter;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
 
-public class TestTypedAdapter extends StreamBasedAdapter {
+public class TestTypedAdapter extends StreamBasedAdapter implements IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
 
@@ -135,4 +136,16 @@
         return false;
     }
 
+    @Override
+    public boolean pause() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean resume() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
 }
diff --git a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index 71c762a..6b858f3 100644
--- a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -28,6 +28,7 @@
 import org.apache.asterix.common.feeds.FeedConnectionRequest;
 import org.apache.asterix.common.feeds.FeedId;
 import org.apache.asterix.common.feeds.FeedPolicyAccessor;
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.lang.aql.parser.AQLParserFactory;
@@ -41,12 +42,11 @@
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.PrimaryFeed;
 import org.apache.asterix.metadata.entities.SecondaryFeed;
-import org.apache.asterix.metadata.feeds.FeedUtil;
+import org.apache.asterix.metadata.feeds.MetadataUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -188,14 +188,14 @@
         try {
             switch (feed.getFeedType()) {
                 case PRIMARY:
-                    Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+                    Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> factoryOutput = null;
 
-                    factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput((PrimaryFeed) feed, policyAccessor,
+                    factoryOutput = MetadataUtil.getPrimaryFeedFactoryAndOutput((PrimaryFeed) feed, policyAccessor,
                             mdTxnCtx);
                     outputType = factoryOutput.second.getTypeName();
                     break;
                 case SECONDARY:
-                    outputType = FeedUtil.getSecondaryFeedOutput((SecondaryFeed) feed, policyAccessor, mdTxnCtx);
+                    outputType = MetadataUtil.getSecondaryFeedOutput((SecondaryFeed) feed, policyAccessor, mdTxnCtx);
                     break;
             }
             return outputType;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index a73a236..ab91632 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -40,6 +40,7 @@
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.context.BaseOperationTracker;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
 import org.apache.asterix.external.api.IAdapterFactory;
@@ -357,7 +358,7 @@
     private static DatasourceAdapter getAdapter(String adapterFactoryClassName) throws Exception {
         String adapterName = ((IAdapterFactory) (Class.forName(adapterFactoryClassName).newInstance())).getAlias();
         return new DatasourceAdapter(new AdapterIdentifier(MetadataConstants.METADATA_DATAVERSE_NAME, adapterName),
-                adapterFactoryClassName, DatasourceAdapter.AdapterType.INTERNAL);
+                adapterFactoryClassName, IDataSourceAdapter.AdapterType.INTERNAL);
     }
 
     private static CompactionPolicy getCompactionPolicyEntity(String compactionPolicyClassName) throws Exception {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index c9157df..b68301f 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -47,6 +47,8 @@
 import org.apache.asterix.common.feeds.FeedConstants;
 import org.apache.asterix.common.feeds.FeedPolicyAccessor;
 import org.apache.asterix.common.feeds.api.ICentralFeedManager;
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter.AdapterType;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
@@ -74,7 +76,6 @@
 import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
@@ -87,7 +88,7 @@
 import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.metadata.feeds.FeedCollectOperatorDescriptor;
 import org.apache.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
-import org.apache.asterix.metadata.feeds.FeedUtil;
+import org.apache.asterix.metadata.feeds.MetadataUtil;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
@@ -595,8 +596,8 @@
 
     public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime(
             JobSpecification jobSpec, PrimaryFeed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception {
-        Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
-        factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx);
+        Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> factoryOutput = null;
+        factoryOutput = MetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx);
         IAdapterFactory adapterFactory = factoryOutput.first;
         FeedIntakeOperatorDescriptor feedIngestor = null;
         switch (factoryOutput.third) {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
index ac98fb0..12ed0b8 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.metadata.entities;
 
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter.AdapterType;
 import org.apache.asterix.metadata.MetadataCache;
 import org.apache.asterix.metadata.api.IMetadataEntity;
 import org.apache.asterix.metadata.feeds.AdapterIdentifier;
@@ -25,11 +26,6 @@
 public class DatasourceAdapter implements IMetadataEntity {
 
     private static final long serialVersionUID = 1L;
-
-    public enum AdapterType {
-        INTERNAL,
-        EXTERNAL
-    }
 
     private final AdapterIdentifier adapterIdentifier;
     private final String classname;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
index c37230d..e7f8fa3 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
@@ -26,12 +26,13 @@
 import java.util.Calendar;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter.AdapterType;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import org.apache.asterix.metadata.feeds.AdapterIdentifier;
 import org.apache.asterix.om.base.ARecord;
 import org.apache.asterix.om.base.AString;
@@ -76,7 +77,7 @@
                 .getValueByPos(MetadataRecordTypes.DATASOURCE_ADAPTER_ARECORD_NAME_FIELD_INDEX)).getStringValue();
         String classname = ((AString) adapterRecord
                 .getValueByPos(MetadataRecordTypes.DATASOURCE_ADAPTER_ARECORD_CLASSNAME_FIELD_INDEX)).getStringValue();
-        AdapterType adapterType = AdapterType.valueOf(((AString) adapterRecord
+        IDataSourceAdapter.AdapterType adapterType = IDataSourceAdapter.AdapterType.valueOf(((AString) adapterRecord
                 .getValueByPos(MetadataRecordTypes.DATASOURCE_ADAPTER_ARECORD_TYPE_FIELD_INDEX)).getStringValue());
 
         return new DatasourceAdapter(new AdapterIdentifier(dataverseName, adapterName), classname, adapterType);
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterExecutor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterExecutor.java
index 6c2f14c..d8e50aa 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterExecutor.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterExecutor.java
@@ -22,21 +22,19 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager.State;
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.api.IFeedAdapter;
+import org.apache.asterix.external.api.IAdapterRuntimeManager.State;
 
 public class AdapterExecutor implements Runnable {
 
     private static final Logger LOGGER = Logger.getLogger(AdapterExecutor.class.getName());
 
     private final DistributeFeedFrameWriter writer;
-
-    private final IDataSourceAdapter adapter;
-
+    private final IFeedAdapter adapter;
     private final IAdapterRuntimeManager adapterManager;
 
-    public AdapterExecutor(int partition, DistributeFeedFrameWriter writer, IDataSourceAdapter adapter,
+    public AdapterExecutor(int partition, DistributeFeedFrameWriter writer, IFeedAdapter adapter,
             IAdapterRuntimeManager adapterManager) {
         this.writer = writer;
         this.adapter = adapter;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterRuntimeManager.java
index aacb3da..c78700d 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterRuntimeManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AdapterRuntimeManager.java
@@ -25,10 +25,10 @@
 
 import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
 import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.IngestionRuntime;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
 import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.api.IFeedAdapter;
+import org.apache.asterix.external.feeds.IngestionRuntime;
 
 public class AdapterRuntimeManager implements IAdapterRuntimeManager {
 
@@ -36,7 +36,7 @@
 
     private final FeedId feedId;
 
-    private final IDataSourceAdapter feedAdapter;
+    private final IFeedAdapter feedAdapter;
 
     private final IIntakeProgressTracker tracker;
 
@@ -50,7 +50,7 @@
 
     private State state;
 
-    public AdapterRuntimeManager(FeedId feedId, IDataSourceAdapter feedAdapter, IIntakeProgressTracker tracker,
+    public AdapterRuntimeManager(FeedId feedId, IFeedAdapter feedAdapter, IIntakeProgressTracker tracker,
             DistributeFeedFrameWriter writer, int partition) {
         this.feedId = feedId;
         this.feedAdapter = feedAdapter;
@@ -77,6 +77,7 @@
             }
         } finally {
             state = State.FINISHED_INGESTION;
+            // This will shut the feed down without giving chance for graceful shutdown
             executorService.shutdown();
         }
     }
@@ -92,7 +93,7 @@
     }
 
     @Override
-    public IDataSourceAdapter getFeedAdapter() {
+    public IFeedAdapter getFeedAdapter() {
         return feedAdapter;
     }
 
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
index 715b68b..b33c9ed 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
@@ -25,10 +25,10 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.feeds.FeedConnectionId;
 import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.IngestionRuntime;
 import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
 import org.apache.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
 import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feeds.IngestionRuntime;
 import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
 import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
 import org.apache.asterix.om.types.ARecordType;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 54c9af5..5bf9170 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -24,11 +24,11 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.feeds.FeedId;
 import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.common.feeds.IngestionRuntime;
 import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
 import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
 import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.feeds.IngestionRuntime;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.metadata.entities.PrimaryFeed;
 import org.apache.asterix.om.types.ARecordType;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 5085087..cda57b0 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -29,17 +29,17 @@
 import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
 import org.apache.asterix.common.feeds.FeedId;
 import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.common.feeds.IngestionRuntime;
 import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager.State;
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
 import org.apache.asterix.common.feeds.api.IFeedManager;
 import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
 import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
 import org.apache.asterix.common.feeds.api.ISubscriberRuntime;
 import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.api.IFeedAdapter;
+import org.apache.asterix.external.api.IAdapterRuntimeManager.State;
+import org.apache.asterix.external.feeds.IngestionRuntime;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -62,7 +62,7 @@
     private final IAdapterFactory adapterFactory;
 
     private IngestionRuntime ingestionRuntime;
-    private IDataSourceAdapter adapter;
+    private IFeedAdapter adapter;
     private IIntakeProgressTracker tracker;
     private DistributeFeedFrameWriter feedFrameWriter;
 
@@ -85,7 +85,7 @@
         try {
             if (ingestionRuntime == null) {
                 try {
-                    adapter = adapterFactory.createAdapter(ctx, partition);
+                    adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
                     //TODO: Fix record tracking
                     //                    if (adapterFactory.isRecordTrackingEnabled()) {
                     //                        tracker = adapterFactory.createIntakeProgressTracker();
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 313fa1a..ce1c564 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -37,12 +37,10 @@
 import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
 import org.apache.asterix.common.feeds.FeedRuntimeManager;
 import org.apache.asterix.common.feeds.FeedTupleCommitResponseMessage;
-import org.apache.asterix.common.feeds.IngestionRuntime;
 import org.apache.asterix.common.feeds.IntakePartitionStatistics;
 import org.apache.asterix.common.feeds.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
 import org.apache.asterix.common.feeds.StorageSideMonitoredBuffer;
 import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
 import org.apache.asterix.common.feeds.api.IFeedManager;
 import org.apache.asterix.common.feeds.api.IFeedMessage;
 import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
@@ -51,6 +49,8 @@
 import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
 import org.apache.asterix.common.feeds.message.EndFeedMessage;
 import org.apache.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.feeds.IngestionRuntime;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
index b409745..454124d 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
@@ -98,8 +98,8 @@
         this.partition = partition;
         this.nPartitions = nPartitions;
         this.connectionId = feedConnectionId;
-        this.feedManager = ((IAsterixAppRuntimeContext) (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject()).getFeedManager();
+        this.feedManager = ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+                .getApplicationObject()).getFeedManager();
         IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
         this.feedManager = runtimeCtx.getFeedManager();
@@ -130,11 +130,11 @@
         }
         this.fta = new FrameTupleAccessor(recordDesc);
         this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
-                policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager,
-                nPartitions);
-        if(coreOperator instanceof AsterixLSMInsertDeleteOperatorNodePushable){
+                policyEnforcer.getFeedPolicyAccessor(), policyEnforcer.getFeedPolicyAccessor().bufferingEnabled(), fta,
+                recordDesc, feedManager, nPartitions);
+        if (coreOperator instanceof AsterixLSMInsertDeleteOperatorNodePushable) {
             AsterixLSMInsertDeleteOperatorNodePushable indexOp = (AsterixLSMInsertDeleteOperatorNodePushable) coreOperator;
-            if(!indexOp.isPrimary()){
+            if (!indexOp.isPrimary()) {
                 inputSideHandler.setBufferingEnabled(false);
             }
         }
@@ -149,8 +149,8 @@
         this.inputSideHandler.setCoreOperator(coreOperator);
         feedRuntime.setMode(Mode.PROCESS);
         if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("Retreived state from the zombie instance from previous execution for " + runtimeType
-                    + " node.");
+            LOGGER.warning(
+                    "Retreived state from the zombie instance from previous execution for " + runtimeType + " node.");
         }
     }
 
@@ -158,7 +158,7 @@
         coreOperator.setOutputFrameWriter(0, writer, recordDesc);
         FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
         feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
-        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, (FeedRuntime) feedRuntime);
+        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
     }
 
     @Override
@@ -216,8 +216,7 @@
 
     private void deregister() {
         if (feedRuntime != null) {
-            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
-                    ((FeedRuntime) feedRuntime).getRuntimeId());
+            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
         }
     }
 
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/MetadataUtil.java
similarity index 95%
rename from asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java
rename to asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/MetadataUtil.java
index 5ed2876..1df57bd 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/MetadataUtil.java
@@ -36,19 +36,20 @@
 import org.apache.asterix.common.feeds.FeedConnectionId;
 import org.apache.asterix.common.feeds.FeedPolicyAccessor;
 import org.apache.asterix.common.feeds.FeedRuntimeId;
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
 import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.external.provider.AdapterFactoryProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.bootstrap.MetadataConstants;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.FeedPolicy;
@@ -86,9 +87,9 @@
 /**
  * A utility class for providing helper functions for feeds
  */
-public class FeedUtil {
+public class MetadataUtil {
 
-    private static Logger LOGGER = Logger.getLogger(FeedUtil.class.getName());
+    private static Logger LOGGER = Logger.getLogger(MetadataUtil.class.getName());
 
     public static String getFeedPointKeyRep(Feed feed, List<String> appliedFunctions) {
         StringBuilder builder = new StringBuilder();
@@ -466,26 +467,31 @@
         return preProcessingRequired;
     }
 
-    public static Triple<IAdapterFactory, ARecordType, AdapterType> getPrimaryFeedFactoryAndOutput(PrimaryFeed feed,
-            FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
-
+    public static Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> getPrimaryFeedFactoryAndOutput(
+            PrimaryFeed feed, FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx)
+                    throws AlgebricksException {
+        // This method needs to be re-visited
         String adapterName = null;
         DatasourceAdapter adapterEntity = null;
         String adapterFactoryClassname = null;
         IAdapterFactory adapterFactory = null;
         ARecordType adapterOutputType = null;
-        Triple<IAdapterFactory, ARecordType, AdapterType> feedProps = null;
-        AdapterType adapterType = null;
+        Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> feedProps = null;
+        IDataSourceAdapter.AdapterType adapterType = null;
         try {
             adapterName = feed.getAdaptorName();
             Map<String, String> configuration = feed.getAdaptorConfiguration();
             configuration.putAll(policyAccessor.getFeedPolicy());
             adapterOutputType = getOutputType(feed, configuration);
+
+            // Get adapter from metadata dataset <Metadata dataverse>
             adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
                     adapterName);
+            // Get adapter from metadata dataset <The feed dataverse> 
             if (adapterEntity == null) {
                 adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
             }
+
             if (adapterEntity != null) {
                 adapterType = adapterEntity.getType();
                 adapterFactoryClassname = adapterEntity.getClassname();
@@ -501,15 +507,16 @@
                         adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
                         break;
                 }
+                ExternalDataUtils.prepareFeed(configuration);
                 adapterFactory.configure(configuration, adapterOutputType);
             } else {
                 configuration.put(ExternalDataConstants.KEY_DATAVERSE, feed.getDataverseName());
-                adapterFactory = AdapterFactoryProvider.getAdapterFactory(adapterName, configuration,
-                        adapterOutputType);
-                adapterType = AdapterType.INTERNAL;
+                adapterFactory = AdapterFactoryProvider.getAdapterFactory(adapterName, configuration, adapterOutputType,
+                        true);
+                adapterType = IDataSourceAdapter.AdapterType.INTERNAL;
             }
-            feedProps = new Triple<IAdapterFactory, ARecordType, AdapterType>(adapterFactory, adapterOutputType,
-                    adapterType);
+            feedProps = new Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType>(adapterFactory,
+                    adapterOutputType, adapterType);
         } catch (Exception e) {
             e.printStackTrace();
             throw new AlgebricksException("unable to create adapter " + e);
@@ -567,7 +574,7 @@
         Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feed.getDataverseName(), primaryFeedName);
         FunctionSignature appliedFunction = primaryFeed.getAppliedFunction();
         if (appliedFunction == null) {
-            Triple<IAdapterFactory, ARecordType, AdapterType> result = getPrimaryFeedFactoryAndOutput(
+            Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> result = getPrimaryFeedFactoryAndOutput(
                     (PrimaryFeed) primaryFeed, policyAccessor, mdTxnCtx);
             outputType = result.second.getTypeName();
         } else {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/574
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4e8db26a810efd1fbaa52ceeb3efd0c8328ab070
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message