asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ti...@apache.org
Subject [2/2] incubator-asterixdb git commit: ASTERIXDB-1360 Fix Error Message for Unknown Format
Date Fri, 01 Apr 2016 22:01:21 GMT
ASTERIXDB-1360 Fix Error Message for Unknown Format

This change includes some refactoring for external
data. This refactoring makes the code less error prone
and separate data source selection from data parser
selection. It also fixes issue ASTERIXDB-1366 and adds
a test case for it as well.

Change-Id: Ib4aac833e30bd7c5a7706f5c8116383c2362c964
Reviewed-on: https://asterix-gerrit.ics.uci.edu/767
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/442e49b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/442e49b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/442e49b9

Branch: refs/heads/master
Commit: 442e49b9f174717035d0f4ebcc0810f5f3f3f29c
Parents: de6e0da
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Fri Apr 1 11:20:29 2016 +0300
Committer: Till Westmann <tillw@apache.org>
Committed: Fri Apr 1 14:55:49 2016 -0700

----------------------------------------------------------------------
 .../asterix/app/external/FeedOperations.java    |  21 +---
 .../rc-format/rc-format.1.ddl.aql               |   7 +-
 .../invalid-format/invalid-format.1.ddl.aql     |  34 ++++++
 .../invalid-format/invalid-format.2.query.aql   |  27 +++++
 .../feeds/twitter-feed/twitter-feed.1.ddl.aql   |  54 +++++++++
 .../twitter-feed/twitter-feed.2.update.aql      |  25 ++++
 .../hdfs/large-record/large-record.1.ddl.aql    |   2 +-
 .../rc-format/rc-format.1.ddl.sqlpp             |   8 +-
 .../src/test/resources/runtimets/testsuite.xml  |  82 +++++++------
 .../resources/runtimets/testsuite_sqlpp.xml     |   4 +-
 .../adapter/factory/GenericAdapterFactory.java  |  19 ++-
 .../external/input/HDFSDataSourceFactory.java   |  46 ++++++--
 .../reader/IndexingStreamRecordReader.java      | 100 ++++++++++++++++
 .../stream/AbstractStreamRecordReader.java      | 116 -------------------
 .../AbstractStreamRecordReaderFactory.java      |  95 ---------------
 .../stream/EmptyLineSeparatedRecordReader.java  |   7 +-
 .../EmptyLineSeparatedRecordReaderFactory.java  |  43 -------
 .../record/reader/stream/LineRecordReader.java  |   8 +-
 .../reader/stream/LineRecordReaderFactory.java  |  52 ---------
 .../reader/stream/QuotedLineRecordReader.java   |   6 +-
 .../stream/SemiStructuredRecordReader.java      |  16 +--
 .../SemiStructuredRecordReaderFactory.java      |  51 --------
 .../reader/stream/StreamRecordReader.java       |  88 ++++++++++++++
 .../stream/StreamRecordReaderFactory.java       |  72 ++++++++++++
 .../twitter/TwitterRecordReaderFactory.java     |  19 +--
 .../external/input/stream/HDFSInputStream.java  |   9 +-
 .../provider/DatasourceFactoryProvider.java     |  74 ++++--------
 .../provider/ParserFactoryProvider.java         |   2 +-
 .../provider/StreamRecordReaderProvider.java    |  81 +++++++++++++
 .../external/util/ExternalDataConstants.java    |  10 +-
 .../external/util/ExternalDataUtils.java        |  20 +---
 .../external/classad/test/ClassAdToADMTest.java |   2 +-
 .../parser/test/RecordWithMetaTest.java         |   2 +-
 .../external_index/external_index.2.ddl.aql     |   6 +-
 34 files changed, 662 insertions(+), 546 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index 5cd490a..b98618d 100644
--- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@ -71,26 +71,17 @@ public class FeedOperations {
      */
     public static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed primaryFeed,
             AqlMetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
-
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
         spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
         IAdapterFactory adapterFactory = null;
         IOperatorDescriptor feedIngestor;
         AlgebricksPartitionConstraint ingesterPc;
-
-        try {
-            Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t = metadataProvider
-                    .buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor);
-            feedIngestor = t.first;
-            ingesterPc = t.second;
-            adapterFactory = t.third;
-        } catch (AlgebricksException e) {
-            e.printStackTrace();
-            throw new AsterixException(e);
-        }
-
+        Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t = metadataProvider
+                .buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor);
+        feedIngestor = t.first;
+        ingesterPc = t.second;
+        adapterFactory = t.third;
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc);
-
         NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc);
         spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0);
@@ -252,7 +243,7 @@ public class FeedOperations {
     private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
             JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String> locations,
             FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, FeedId sourceFeedId)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         IFeedMessage feedMessage = new EndFeedMessage(feedConenctionId, sourceFeedRuntimeType, sourceFeedId,
                 completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED);
         return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql
index 5a7294c..4ffc5a7 100644
--- a/asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql
@@ -36,6 +36,11 @@ create type EmployeeType as closed {
 
 create external dataset EmployeeDataset(EmployeeType)
 using hdfs
-(("hdfs"="hdfs://127.0.0.1:31888"),("path"="/asterix/external-indexing-test.rc"),("input-format"="rc-input-format"),("format"="binary"),("parser"="hive-parser"),("hive-serde"="org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"));
+(("hdfs"="hdfs://127.0.0.1:31888"),
+("path"="/asterix/external-indexing-test.rc"),
+("input-format"="rc-input-format"),
+("format"="hdfs-writable"),
+("parser"="hive-parser"),
+("hive-serde"="org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"));
 
 create index EmployeeAgeIdx on EmployeeDataset(age);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.1.ddl.aql
new file mode 100644
index 0000000..7c668e4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.1.ddl.aql
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Testing an external dataset with invalid adapter format parameter value
+ * Expected result: fail - Unknown data format.
+ */
+
+drop dataverse temp if exists;
+create dataverse temp
+use dataverse temp;
+
+create type test as closed {
+  id: int32
+};
+
+create external dataset testds (test) using localfs(
+("path"="asterix_nc1://data/csv/sample_04_quote_error.csv"),
+("format"="add"));
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.2.query.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.2.query.aql b/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.2.query.aql
new file mode 100644
index 0000000..438e0b6
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.2.query.aql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Testing an external dataset with invalid adapter format parameter value
+ * Expected result: fail - Unknown data format.
+ */
+
+use dataverse temp;
+
+for $i in dataset testds
+return $i;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.1.ddl.aql
new file mode 100644
index 0000000..d7827c5
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.1.ddl.aql
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a twitter feed with missing parameters
+ * Expected Res : Failure
+ */
+
+drop dataverse feeds if exists;
+create dataverse feeds;
+use dataverse feeds;
+
+create type TwitterUser if not exists as open{
+screen_name: string,
+language: string,
+friends_count: int32,
+status_count: int32,
+name: string,
+followers_count: int32
+};
+
+create type Tweet if not exists as open{
+id: string,
+user: TwitterUser,
+latitude:double,
+longitude:double,
+created_at:string,
+message_text:string
+};
+
+create dataset Tweets (Tweet)
+primary key id;
+
+create feed TwitterFeed using push_twitter(
+("type-name"="Tweet"),
+("format"="twitter-status"),
+("consumer.key"="************"),
+("access.token"="**********"),
+("access.token.secret"="*************"));
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.2.update.aql
new file mode 100644
index 0000000..6712969
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.2.update.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a twitter feed with missing parameters
+ * Expected Res : Failure
+ */
+
+use dataverse feeds;
+connect feed TwitterFeed to dataset Tweets;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql
index 000ef5b..4e306b3 100644
--- a/asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql
@@ -37,7 +37,7 @@ create type EmployeeType as closed {
 create external dataset EmployeeDataset(EmployeeType)
 using adapter
 (("reader"="hdfs"),
-("parser"="delimited-text"),
+("format"="delimited-text"),
 ("hdfs"="hdfs://127.0.0.1:31888"),
 ("path"="/asterix/large-record.txt"),
 ("input-format"="text-input-format"),

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-indexing/rc-format/rc-format.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-indexing/rc-format/rc-format.1.ddl.sqlpp b/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-indexing/rc-format/rc-format.1.ddl.sqlpp
index b14f3cb..4736dee 100644
--- a/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-indexing/rc-format/rc-format.1.ddl.sqlpp
+++ b/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-indexing/rc-format/rc-format.1.ddl.sqlpp
@@ -37,7 +37,13 @@ create type test.EmployeeType as
   age : int64
 }
 
-create external  table EmployeeDataset(EmployeeType) using "hdfs"(("hdfs"="hdfs://127.0.0.1:31888"),("path"="/asterix/external-indexing-test.rc"),("input-format"="rc-input-format"),("format"="binary"),("parser"="hive-parser"),("hive-serde"="org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"));
+create external  table EmployeeDataset(EmployeeType) using "hdfs"(
+("hdfs"="hdfs://127.0.0.1:31888"),
+("path"="/asterix/external-indexing-test.rc"),
+("input-format"="rc-input-format"),
+("format"="hdfs-writable"),
+("parser"="hive-parser"),
+("hive-serde"="org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"));
 
 create  index EmployeeAgeIdx  on EmployeeDataset (age) type btree;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 3a5140c..40ba82d 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -28,6 +28,46 @@
         ResultOffsetPath="results"
         QueryOffsetPath="queries"
         QueryFileExtension=".aql">
+    <test-group name="external">
+        <test-case FilePath="external">
+            <compilation-unit name="invalid-format">
+                <output-dir compare="Text">invalid-format</output-dir>
+                <expected-error>Unknown format</expected-error>
+            </compilation-unit>
+        </test-case>
+    </test-group>
+    <test-group name="external-indexing">
+        <test-case FilePath="external-indexing">
+            <compilation-unit name="text-format">
+                <output-dir compare="Text">text-format</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="external-indexing">
+            <compilation-unit name="sequence-format">
+                <output-dir compare="Text">sequence-format</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="external-indexing">
+            <compilation-unit name="rc-format">
+                <output-dir compare="Text">rc-format</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="external-indexing">
+            <compilation-unit name="rtree-index">
+                <output-dir compare="Text">rtree-index</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="external-indexing">
+            <compilation-unit name="leftouterjoin">
+                <output-dir compare="Text">leftouterjoin</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="external-indexing">
+            <compilation-unit name="leftouterjoin-rtree">
+                <output-dir compare="Text">leftouterjoin-rtree</output-dir>
+            </compilation-unit>
+        </test-case>
+    </test-group>
     <test-group name="external-library">
         <test-case FilePath="external-library">
             <compilation-unit name="typed_adapter">
@@ -52,6 +92,12 @@
     </test-group>
     <test-group name="feeds">
         <test-case FilePath="feeds">
+            <compilation-unit name="twitter-feed">
+                <output-dir compare="Text">twitter-feed</output-dir>
+                <expected-error>One or more parameters are missing from adapter configuration</expected-error>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
             <compilation-unit name="feed-with-external-parser">
                 <output-dir compare="Text">feed-with-external-parser</output-dir>
             </compilation-unit>
@@ -6256,13 +6302,13 @@
         <test-case FilePath="load">
             <compilation-unit name="issue14_query">
                 <output-dir compare="Text">issue14_query</output-dir>
-                <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
+                <expected-error>The parameter format must be specified</expected-error>
             </compilation-unit>
         </test-case>
         <test-case FilePath="load">
             <compilation-unit name="issue315_query">
                 <output-dir compare="Text">none</output-dir>
-                <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
+                <expected-error>The parameter format must be specified</expected-error>
             </compilation-unit>
         </test-case>
         <test-case FilePath="load">
@@ -6390,38 +6436,6 @@
             </compilation-unit>
         </test-case>
     </test-group>
-    <test-group name="external-indexing">
-        <test-case FilePath="external-indexing">
-            <compilation-unit name="text-format">
-                <output-dir compare="Text">text-format</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="external-indexing">
-            <compilation-unit name="sequence-format">
-                <output-dir compare="Text">sequence-format</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="external-indexing">
-            <compilation-unit name="rc-format">
-                <output-dir compare="Text">rc-format</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="external-indexing">
-            <compilation-unit name="rtree-index">
-                <output-dir compare="Text">rtree-index</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="external-indexing">
-            <compilation-unit name="leftouterjoin">
-                <output-dir compare="Text">leftouterjoin</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="external-indexing">
-            <compilation-unit name="leftouterjoin-rtree">
-                <output-dir compare="Text">leftouterjoin-rtree</output-dir>
-            </compilation-unit>
-        </test-case>
-    </test-group>
     <test-group name="temporal">
         <test-case FilePath="temporal">
             <compilation-unit name="overlap_bins_gby_3">

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index c343570..f5bb7ce 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6233,13 +6233,13 @@
         <test-case FilePath="load">
             <compilation-unit name="issue14_query">
                 <output-dir compare="Text">issue14_query</output-dir>
-                <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
+                <expected-error>The parameter format must be specified</expected-error>
             </compilation-unit>
         </test-case>
         <test-case FilePath="load">
             <compilation-unit name="issue315_query">
                 <output-dir compare="Text">none</output-dir>
-                <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
+                <expected-error>The parameter format must be specified</expected-error>
             </compilation-unit>
         </test-case>
         <test-case FilePath="load">

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index a03ad1a..041f706 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -120,9 +120,14 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
         this.metaType = metaType;
         this.configuration = configuration;
         dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
-
         dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
-        prepare();
+        if (dataSourceFactory.isIndexible() && (files != null)) {
+            ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
+        }
+        dataSourceFactory.configure(configuration);
+        dataParserFactory.setRecordType(recordType);
+        dataParserFactory.setMetaType(metaType);
+        dataParserFactory.configure(configuration);
         ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, dataParserFactory);
         configureFeedLogManager();
         nullifyExternalObjects();
@@ -145,16 +150,6 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
         }
     }
 
-    private void prepare() throws AsterixException {
-        if (dataSourceFactory.isIndexible() && (files != null)) {
-            ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
-        }
-        dataSourceFactory.configure(configuration);
-        dataParserFactory.setRecordType(recordType);
-        dataParserFactory.setMetaType(metaType);
-        dataParserFactory.configure(configuration);
-    }
-
     @Override
     public ARecordType getAdapterOutputType() {
         return recordType;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index de185e0..529977a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -25,15 +25,20 @@ import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IIndexibleExternalDataSource;
-import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingScheduler;
+import org.apache.asterix.external.input.record.reader.IndexingStreamRecordReader;
 import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
 import org.apache.asterix.external.input.stream.HDFSInputStream;
 import org.apache.asterix.external.provider.ExternalIndexerProvider;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.hadoop.io.Writable;
@@ -48,8 +53,7 @@ import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
 import org.apache.hyracks.hdfs.scheduler.Scheduler;
 
-public class HDFSDataSourceFactory
-        implements IInputStreamFactory, IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
+public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
 
     protected static final long serialVersionUID = 1L;
     protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
@@ -69,6 +73,7 @@ public class HDFSDataSourceFactory
     private JobConf conf;
     private InputSplit[] inputSplits;
     private String nodeName;
+    private Format format;
 
     @Override
     public void configure(Map<String, String> configuration) throws AsterixException {
@@ -94,10 +99,14 @@ public class HDFSDataSourceFactory
             inputSplitsFactory = new InputSplitsFactory(inputSplits);
             read = new boolean[readSchedule.length];
             Arrays.fill(read, false);
-            if (!ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM)) {
+            String formatString = configuration.get(ExternalDataConstants.KEY_FORMAT);
+            if (formatString == null || formatString.equals(ExternalDataConstants.FORMAT_HDFS_WRITABLE)) {
                 RecordReader<?, ?> reader = conf.getInputFormat().getRecordReader(inputSplits[0], conf, Reporter.NULL);
                 this.recordClass = reader.createValue().getClass();
                 reader.close();
+            } else {
+                format = StreamRecordReaderProvider.getReaderFormat(configuration);
+                this.recordClass = char[].class;
             }
         } catch (IOException e) {
             throw new AsterixException(e);
@@ -117,8 +126,8 @@ public class HDFSDataSourceFactory
      * 1. when target files are not null, it generates a file aware input stream that validate
      * against the files
      * 2. if the data is binary, it returns a generic reader */
-    @Override
-    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition, IExternalIndexer indexer)
+            throws HyracksDataException {
         try {
             if (!configured) {
                 conf = confFactory.getConf();
@@ -126,7 +135,7 @@ public class HDFSDataSourceFactory
                 nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
                 configured = true;
             }
-            return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, files);
+            return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, files, indexer);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
@@ -170,15 +179,34 @@ public class HDFSDataSourceFactory
         return ExternalDataUtils.getDataSourceType(configuration);
     }
 
+    /**
+     * HDFS Datasource is a special case in two ways:
+     * 1. It supports indexing.
+     * 2. It returns input as a set of writable object that we sometimes internally transform into a byte stream
+     * Hence, it can produce:
+     * 1. StreamRecordReader: When we transform the input into a byte stream.
+     * 2. Indexing Stream Record Reader: When we transform the input into a byte stream and perform indexing.
+     * 3. HDFS Record Reader: When we simply pass the Writable object as it is to the parser.
+     */
     @Override
-    public IRecordReader<? extends Writable> createRecordReader(IHyracksTaskContext ctx, int partition)
+    public IRecordReader<? extends Object> createRecordReader(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
         try {
+            IExternalIndexer indexer = files == null ? null : ExternalIndexerProvider.getIndexer(configuration);
+            if (format != null) {
+                StreamRecordReader streamReader = StreamRecordReaderProvider.createRecordReader(format,
+                        createInputStream(ctx, partition, indexer), configuration);
+                if (indexer != null) {
+                    return new IndexingStreamRecordReader(streamReader, indexer);
+                } else {
+                    return streamReader;
+                }
+            }
             JobConf conf = confFactory.getConf();
             InputSplit[] inputSplits = inputSplitsFactory.getSplits();
             String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
             return new HDFSRecordReader<Object, Writable>(read, inputSplits, readSchedule, nodeName, conf, files,
-                    files == null ? null : ExternalIndexerProvider.getIndexer(configuration));
+                    indexer);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
new file mode 100644
index 0000000..2c2dd98
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+
+public class IndexingStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource {
+
+    private StreamRecordReader reader;
+    private IExternalIndexer indexer;
+
+    public IndexingStreamRecordReader(StreamRecordReader reader, IExternalIndexer indexer) {
+        this.reader = reader;
+        this.indexer = indexer;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    public IExternalIndexer getIndexer() {
+        return indexer;
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return reader.hasNext();
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException, InterruptedException {
+        return reader.next();
+    }
+
+    @Override
+    public boolean stop() {
+        return reader.stop();
+    }
+
+    @Override
+    public void setController(AbstractFeedDataFlowController controller) {
+        reader.setController(controller);
+    }
+
+    @Override
+    public void setFeedLogManager(FeedLogManager feedLogManager) {
+        reader.setFeedLogManager(feedLogManager);
+    }
+
+    @Override
+    public List<ExternalFile> getSnapshot() {
+        return null;
+    }
+
+    @Override
+    public int getCurrentSplitIndex() {
+        return -1;
+    }
+
+    @Override
+    public RecordReader<?, ? extends Writable> getReader() {
+        return null;
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return reader.handleException(th);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
deleted file mode 100644
index 7d6c1f3..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexingDatasource;
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.input.record.CharArrayRecord;
-import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.RecordReader;
-
-public abstract class AbstractStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource {
-    protected final AsterixInputStreamReader reader;
-    protected CharArrayRecord record;
-    protected char[] inputBuffer;
-    protected int bufferLength = 0;
-    protected int bufferPosn = 0;
-    protected final IExternalIndexer indexer;
-    protected boolean done = false;
-    protected FeedLogManager feedLogManager;
-
-    public AbstractStreamRecordReader(AsterixInputStream inputStream, IExternalIndexer indexer) {
-        this.reader = new AsterixInputStreamReader(inputStream);
-        this.indexer = indexer;
-        record = new CharArrayRecord();
-        inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
-    }
-
-    @Override
-    public IRawRecord<char[]> next() throws IOException {
-        return record;
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (!done) {
-            reader.close();
-        }
-        done = true;
-    }
-
-    @Override
-    public IExternalIndexer getIndexer() {
-        return indexer;
-    }
-
-    @Override
-    public boolean stop() {
-        try {
-            reader.stop();
-            return true;
-        } catch (Exception e) {
-            e.printStackTrace();
-            return false;
-        }
-    }
-
-    @Override
-    public void setController(AbstractFeedDataFlowController controller) {
-        reader.setController(controller);
-    }
-
-    @Override
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
-        this.feedLogManager = feedLogManager;
-        reader.setFeedLogManager(feedLogManager);
-    }
-
-    @Override
-    public boolean handleException(Throwable th) {
-        return reader.handleException(th);
-    }
-
-    //TODO: Fix the following method since they don't fit
-    //Already the fix is in another local branch
-    @Override
-    public List<ExternalFile> getSnapshot() {
-        return null;
-    }
-
-    @Override
-    public int getCurrentSplitIndex() {
-        return -1;
-    }
-
-    @Override
-    public RecordReader<?, Writable> getReader() {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
deleted file mode 100644
index 2c82f47..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexibleExternalDataSource;
-import org.apache.asterix.external.api.IIndexingDatasource;
-import org.apache.asterix.external.api.IInputStreamFactory;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public abstract class AbstractStreamRecordReaderFactory<T>
-        implements IRecordReaderFactory<T>, IIndexibleExternalDataSource {
-
-    private static final long serialVersionUID = 1L;
-    protected IInputStreamFactory inputStreamFactory;
-    protected Map<String, String> configuration;
-
-    public AbstractStreamRecordReaderFactory<T> setInputStreamFactoryProvider(
-            IInputStreamFactory inputStreamFactory) {
-        this.inputStreamFactory = inputStreamFactory;
-        return this;
-    }
-
-    @Override
-    public DataSourceType getDataSourceType() {
-        return DataSourceType.RECORDS;
-    }
-
-    @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException {
-        return inputStreamFactory.getPartitionConstraint();
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws AsterixException {
-        this.configuration = configuration;
-        inputStreamFactory.configure(configuration);
-    }
-
-    @Override
-    public boolean isIndexible() {
-        return inputStreamFactory.isIndexible();
-    }
-
-    @Override
-    public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
-        ((IIndexibleExternalDataSource) inputStreamFactory).setSnapshot(files, indexingOp);
-    }
-
-    @Override
-    public boolean isIndexingOp() {
-        if (inputStreamFactory.isIndexible()) {
-            return ((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp();
-        }
-        return false;
-    }
-
-    protected Pair<AsterixInputStream, IExternalIndexer> getStreamAndIndexer(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        AsterixInputStream inputStream = inputStreamFactory.createInputStream(ctx, partition);
-        IExternalIndexer indexer = null;
-        if (inputStreamFactory.isIndexible()) {
-            if (((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp()) {
-                indexer = ((IIndexingDatasource) inputStream).getIndexer();
-            }
-        }
-        return new Pair<AsterixInputStream, IExternalIndexer>(inputStream, indexer);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
index 6964a82..aa0451a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
@@ -21,13 +21,12 @@ package org.apache.asterix.external.input.record.reader.stream;
 import java.io.IOException;
 
 import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.util.ExternalDataConstants;
 
-public class EmptyLineSeparatedRecordReader extends AbstractStreamRecordReader {
+public class EmptyLineSeparatedRecordReader extends StreamRecordReader {
 
-    public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream, IExternalIndexer indexer) {
-        super(inputStream, indexer);
+    public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream) {
+        super(inputStream);
     }
 
     private boolean prevCharCR;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java
deleted file mode 100644
index 063ed11..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class EmptyLineSeparatedRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        final Pair<AsterixInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
-        return new EmptyLineSeparatedRecordReader(streamAndIndexer.first, streamAndIndexer.second);
-    }
-
-    @Override
-    public Class<? extends char[]> getRecordClass() {
-        return char[].class;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index 3089295..8572fc7 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -21,19 +21,17 @@ package org.apache.asterix.external.input.record.reader.stream;
 import java.io.IOException;
 
 import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class LineRecordReader extends AbstractStreamRecordReader {
+public class LineRecordReader extends StreamRecordReader {
 
     protected boolean prevCharCR;
     protected int newlineLength;
     protected int recordNumber = 0;
 
-    public LineRecordReader(final boolean hasHeader, final AsterixInputStream stream, final IExternalIndexer indexer)
-            throws HyracksDataException {
-        super(stream, indexer);
+    public LineRecordReader(final boolean hasHeader, final AsterixInputStream stream) throws HyracksDataException {
+        super(stream);
         try {
             if (hasHeader) {
                 if (hasNext()) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
deleted file mode 100644
index 4d44001..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class LineRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
-        boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
-        Pair<AsterixInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
-        if (quoteString != null) {
-            return new QuotedLineRecordReader(hasHeader, streamAndIndexer.first, streamAndIndexer.second, quoteString);
-        } else {
-            return new LineRecordReader(hasHeader, streamAndIndexer.first, streamAndIndexer.second);
-        }
-    }
-
-    @Override
-    public Class<? extends char[]> getRecordClass() {
-        return char[].class;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index abd2952..515e0e5 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -32,9 +32,9 @@ public class QuotedLineRecordReader extends LineRecordReader {
     private boolean prevCharEscape;
     private boolean inQuote;
 
-    public QuotedLineRecordReader(final boolean hasHeader, final AsterixInputStream stream,
-            final IExternalIndexer indexer, final String quoteString) throws HyracksDataException {
-        super(hasHeader, stream, indexer);
+    public QuotedLineRecordReader(final boolean hasHeader, final AsterixInputStream stream, final String quoteString)
+            throws HyracksDataException {
+        super(hasHeader, stream);
         if ((quoteString == null) || (quoteString.length() != 1)) {
             throw new HyracksDataException(ExternalDataExceptionUtils.incorrectParameterMessage(
                     ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 7339bfd..26ac3cb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -20,13 +20,13 @@ package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
+public class SemiStructuredRecordReader extends StreamRecordReader {
 
     private int depth;
     private boolean prevCharEscape;
@@ -35,13 +35,13 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
     private char recordEnd;
     private int recordNumber = 0;
 
-    public SemiStructuredRecordReader(AsterixInputStream stream, IExternalIndexer indexer, String recStartString,
-            String recEndString) throws AsterixException {
-        super(stream, indexer);
+    public SemiStructuredRecordReader(AsterixInputStream stream, String recStartString, String recEndString)
+            throws HyracksDataException {
+        super(stream);
         // set record opening char
         if (recStartString != null) {
             if (recStartString.length() != 1) {
-                throw new AsterixException(
+                throw new HyracksDataException(
                         ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
                                 ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recStartString));
             }
@@ -52,7 +52,7 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
         // set record ending char
         if (recEndString != null) {
             if (recEndString.length() != 1) {
-                throw new AsterixException(
+                throw new HyracksDataException(
                         ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
                                 ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recEndString));
             }
@@ -67,7 +67,7 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
     }
 
     @Override
-    public boolean hasNext() throws Exception {
+    public boolean hasNext() throws IOException {
         if (done) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
deleted file mode 100644
index 0f50204..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class SemiStructuredRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        Pair<AsterixInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
-        try {
-            return new SemiStructuredRecordReader(streamAndIndexer.first, streamAndIndexer.second,
-                    configuration.get(ExternalDataConstants.KEY_RECORD_START),
-                    configuration.get(ExternalDataConstants.KEY_RECORD_END));
-        } catch (AsterixException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public Class<? extends char[]> getRecordClass() {
-        return char[].class;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
new file mode 100644
index 0000000..57ef3ae
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedLogManager;
+
+public abstract class StreamRecordReader implements IRecordReader<char[]> {
+    protected final AsterixInputStreamReader reader;
+    protected CharArrayRecord record;
+    protected char[] inputBuffer;
+    protected int bufferLength = 0;
+    protected int bufferPosn = 0;
+    protected boolean done = false;
+    protected FeedLogManager feedLogManager;
+
+    public StreamRecordReader(AsterixInputStream inputStream) {
+        this.reader = new AsterixInputStreamReader(inputStream);
+        record = new CharArrayRecord();
+        inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException {
+        return record;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (!done) {
+            reader.close();
+        }
+        done = true;
+    }
+
+    @Override
+    public boolean stop() {
+        try {
+            reader.stop();
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+    }
+
+    @Override
+    public abstract boolean hasNext() throws IOException;
+
+    @Override
+    public void setFeedLogManager(FeedLogManager feedLogManager) {
+        reader.setFeedLogManager(feedLogManager);
+    }
+
+    @Override
+    public void setController(AbstractFeedDataFlowController controller) {
+        reader.setController(controller);
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return reader.handleException(th);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
new file mode 100644
index 0000000..f743a3f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class StreamRecordReaderFactory implements IRecordReaderFactory<char[]> {
+
+    private static final long serialVersionUID = 1L;
+    protected final IInputStreamFactory streamFactory;
+    protected Map<String, String> configuration;
+    protected Format format;
+
+    public StreamRecordReaderFactory(IInputStreamFactory inputStreamFactory) {
+        this.streamFactory = inputStreamFactory;
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException {
+        return streamFactory.getPartitionConstraint();
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws AsterixException {
+        this.configuration = configuration;
+        streamFactory.configure(configuration);
+        format = StreamRecordReaderProvider.getReaderFormat(configuration);
+    }
+
+    @Override
+    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        return StreamRecordReaderProvider.createRecordReader(format, streamFactory.createInputStream(ctx, partition),
+                configuration);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 7ca185f..541737a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -27,7 +27,6 @@ import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.TwitterUtil;
 import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
 import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
@@ -71,10 +70,10 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
             builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
             builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
             builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
-            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
+            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
             throw new AsterixException(builder.toString());
         }
-        if (ExternalDataUtils.isPull(configuration)) {
+        if (TwitterRecordReaderFactory.isTwitterPull(configuration)) {
             pull = true;
             if (configuration.get(SearchAPIConstants.QUERY) == null) {
                 throw new AsterixException(
@@ -95,12 +94,18 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
                             + DEFAULT_INTERVAL + ")");
                 }
             }
-        } else if (ExternalDataUtils.isPush(configuration)) {
-            pull = false;
         } else {
-            throw new AsterixException("One of boolean parameters " + ExternalDataConstants.KEY_PULL + " and "
-                    + ExternalDataConstants.KEY_PUSH + " must be specified as part of adaptor configuration");
+            pull = false;
+        }
+    }
+
+    public static boolean isTwitterPull(Map<String, String> configuration) {
+        String reader = configuration.get(ExternalDataConstants.KEY_READER);
+        if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL)
+                || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) {
+            return true;
         }
+        return false;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
index 063b8fa..997c254 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
@@ -28,7 +28,6 @@ import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IIndexingDatasource;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.input.record.reader.hdfs.EmptyRecordReader;
-import org.apache.asterix.external.provider.ExternalIndexerProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -63,8 +62,8 @@ public class HDFSInputStream extends AsterixInputStream implements IIndexingData
 
     @SuppressWarnings("unchecked")
     public HDFSInputStream(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
-            JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot)
-                    throws IOException, AsterixException {
+            JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot, IExternalIndexer indexer)
+            throws IOException, AsterixException {
         this.read = read;
         this.inputSplits = inputSplits;
         this.readSchedule = readSchedule;
@@ -74,15 +73,13 @@ public class HDFSInputStream extends AsterixInputStream implements IIndexingData
         this.reader = new EmptyRecordReader<Object, Text>();
         this.snapshot = snapshot;
         this.hdfs = FileSystem.get(conf);
+        this.indexer = indexer;
         nextInputSplit();
         this.value = new Text();
         if (snapshot != null) {
-            this.indexer = ExternalIndexerProvider.getIndexer(configuration);
             if (currentSplitIndex < snapshot.size()) {
                 indexer.reset(this);
             }
-        } else {
-            this.indexer = null;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index f8d64e0..0f24f91 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -29,11 +29,10 @@ import org.apache.asterix.external.input.HDFSDataSourceFactory;
 import org.apache.asterix.external.input.record.reader.RecordWithPKTestReaderFactory;
 import org.apache.asterix.external.input.record.reader.kv.KVReaderFactory;
 import org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory;
-import org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
 import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
 import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory;
+import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory;
 import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
 import org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -53,21 +52,18 @@ public class DatasourceFactoryProvider {
         }
     }
 
-    public static IInputStreamFactory getInputStreamFactory(String streamSource,
-            Map<String, String> configuration) throws AsterixException {
+    public static IInputStreamFactory getInputStreamFactory(String streamSource, Map<String, String> configuration)
+            throws AsterixException {
         IInputStreamFactory streamSourceFactory;
         if (ExternalDataUtils.isExternal(streamSource)) {
             String dataverse = ExternalDataUtils.getDataverse(configuration);
             streamSourceFactory = ExternalDataUtils.createExternalInputStreamFactory(dataverse, streamSource);
         } else {
             switch (streamSource) {
-                case ExternalDataConstants.STREAM_HDFS:
-                    streamSourceFactory = new HDFSDataSourceFactory();
-                    break;
                 case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM:
                     streamSourceFactory = new LocalFSInputStreamFactory();
                     break;
-                case ExternalDataConstants.STREAM_SOCKET:
+                case ExternalDataConstants.SOCKET:
                 case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
                     streamSourceFactory = new SocketServerInputStreamFactory();
                     break;
@@ -89,59 +85,29 @@ public class DatasourceFactoryProvider {
         if (reader.equals(ExternalDataConstants.EXTERNAL)) {
             return ExternalDataUtils.createExternalRecordReaderFactory(configuration);
         }
-        String parser = configuration.get(ExternalDataConstants.KEY_PARSER);
-        IInputStreamFactory inputStreamFactory;
-        switch (parser) {
-            case ExternalDataConstants.FORMAT_ADM:
-            case ExternalDataConstants.FORMAT_JSON:
-            case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
-                inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                return new SemiStructuredRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-            case ExternalDataConstants.FORMAT_LINE_SEPARATED:
-                inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                return new EmptyLineSeparatedRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-            case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
-            case ExternalDataConstants.FORMAT_CSV:
-                inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                return new LineRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-            case ExternalDataConstants.FORMAT_RECORD_WITH_METADATA:
-                switch (reader) {
-                    case ExternalDataConstants.READER_KV:
-                        return new KVReaderFactory();
-                    case ExternalDataConstants.READER_KV_TEST:
-                        return new KVTestReaderFactory();
-                }
-        }
-        String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
-        if (format != null) {
-            switch (format) {
-                case ExternalDataConstants.FORMAT_ADM:
-                case ExternalDataConstants.FORMAT_JSON:
-                case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
-                    inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                    return new SemiStructuredRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-                case ExternalDataConstants.FORMAT_LINE_SEPARATED:
-                    inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                    return new EmptyLineSeparatedRecordReaderFactory()
-                            .setInputStreamFactoryProvider(inputStreamFactory);
-                case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
-                case ExternalDataConstants.FORMAT_CSV:
-                    inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                    return new LineRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-            }
-        }
         switch (reader) {
+            case ExternalDataConstants.READER_KV:
+                return new KVReaderFactory();
+            case ExternalDataConstants.READER_KV_TEST:
+                return new KVTestReaderFactory();
             case ExternalDataConstants.READER_HDFS:
                 return new HDFSDataSourceFactory();
+            case ExternalDataConstants.ALIAS_LOCALFS_ADAPTER:
+                return new StreamRecordReaderFactory(new LocalFSInputStreamFactory());
             case ExternalDataConstants.READER_TWITTER_PULL:
             case ExternalDataConstants.READER_TWITTER_PUSH:
+            case ExternalDataConstants.READER_PUSH_TWITTER:
+            case ExternalDataConstants.READER_PULL_TWITTER:
                 return new TwitterRecordReaderFactory();
-            case ExternalDataConstants.READER_KV:
-                return new KVReaderFactory();
-            case ExternalDataConstants.READER_KV_TEST:
-                return new KVTestReaderFactory();
             case ExternalDataConstants.TEST_RECORD_WITH_PK:
                 return new RecordWithPKTestReaderFactory();
+            case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
+                return new StreamRecordReaderFactory(new TwitterFirehoseStreamFactory());
+            case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
+            case ExternalDataConstants.SOCKET:
+                return new StreamRecordReaderFactory(new SocketServerInputStreamFactory());
+            case ExternalDataConstants.STREAM_SOCKET_CLIENT:
+                return new StreamRecordReaderFactory(new SocketClientInputStreamFactory());
             default:
                 throw new AsterixException("unknown record reader factory: " + reader);
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/442e49b9/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 06928b3..682fb89 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -70,7 +70,7 @@ public class ParserFactoryProvider {
             case ExternalDataConstants.TEST_RECORD_WITH_PK:
                 return new TestRecordWithPKParserFactory();
             default:
-                throw new AsterixException("Unknown parser " + parser);
+                throw new AsterixException("Unknown format: " + parser);
         }
     }
 }


Mime
View raw message