asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject asterixdb git commit: Improve External Data
Date Sat, 11 Jun 2016 00:43:37 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master e639c7044 -> 67dabe36b


Improve External Data

This change enable an adapter to specify its data parser and not have
the user specify it and pass it as part of the adapter configurations.
In addition, it introduces a new parameter "parser-factory" that can be
used to specifies a parser factory instead of using parser to specify
a parser factory.

Change-Id: Iae2560c73fa63e9454f731b8e893ae779a2ac7d9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/906
Reviewed-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/67dabe36
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/67dabe36
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/67dabe36

Branch: refs/heads/master
Commit: 67dabe36b01081f6bb0f540db26aa74cb3dd2641
Parents: e639c70
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Sat Jun 11 00:51:16 2016 +0300
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Fri Jun 10 17:42:30 2016 -0700

----------------------------------------------------------------------
 .../optimizer/rules/UnnestToDataScanRule.java   |   7 +-
 .../app/external/FeedWorkCollection.java        |   2 +-
 .../asterix/aql/translator/QueryTranslator.java |  13 +-
 .../change-feed-with-meta-pk-in-meta.1.ddl.aql  |   2 -
 .../queries/feeds/feeds_01/feeds_01.1.ddl.aql   |   4 +-
 .../queries/feeds/feeds_03/feeds_03.1.ddl.aql   |   4 +-
 .../results/feeds/feeds_01/feeds_01.1.adm       |   2 +-
 .../results/feeds/feeds_03/feeds_03.1.adm       |   2 +-
 .../src/test/resources/runtimets/testsuite.xml  |   5 +-
 .../resources/runtimets/testsuite_sqlpp.xml     |   4 +-
 asterixdb/asterix-external-data/pom.xml         |   2 +-
 .../adapter/factory/GenericAdapterFactory.java  |  30 ++++-
 .../asterix/external/api/IAdapterFactory.java   |  20 +--
 .../external/api/IRecordWithMetaDataParser.java |  26 ----
 .../external/api/IRecordWithMetadataParser.java |  30 +++++
 .../ChangeFeedWithMetaDataFlowController.java   |  10 +-
 .../FeedWithMetaDataFlowController.java         |  12 +-
 .../input/record/RecordWithMetadataAndPK.java   |   2 +-
 .../external/input/record/RecordWithPK.java     |   2 +-
 .../record/converter/DCPConverterFactory.java   |   2 +-
 .../converter/DCPMessageToRecordConverter.java  | 123 +++++++++++++++++
 ...questToRecordWithMetadataAndPKConverter.java | 124 ------------------
 .../operators/FeedIntakeOperatorDescriptor.java |   3 +-
 .../parser/RecordWithMetadataParser.java        |   9 +-
 .../provider/AdapterFactoryProvider.java        |  13 +-
 .../provider/DataflowControllerProvider.java    |   8 +-
 .../provider/ParserFactoryProvider.java         |   7 +-
 .../provider/StreamRecordReaderProvider.java    |   2 +-
 .../external/util/ExternalDataConstants.java    |   2 +-
 .../external/util/ExternalDataUtils.java        |  20 ++-
 .../generator/test/DCPGeneratorTest.java        |   4 +-
 .../asterix/external/library/ClassAdParser.java |   5 +-
 .../adapter/TestTypedAdapterFactory.java        |  30 ++++-
 .../external/parser/TestRecordWithPKParser.java |   2 +-
 .../parser/test/RecordWithMetaTest.java         |   2 +-
 .../metadata/feeds/FeedMetadataUtil.java        | 131 +++++++++++++++----
 36 files changed, 399 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index a0060b4..327154f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -234,7 +234,12 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
             if (metaTypeName == null) {
                 throw new AlgebricksException("Feed to a dataset with metadata doesn't have meta type specified");
             }
-            metaType = (ARecordType) metadataProvider.findType(aqlId.getDataverseName(), metaTypeName);
+            String dataverseName = aqlId.getDataverseName();
+            if (metaTypeName.contains(".")) {
+                dataverseName = metaTypeName.substring(0, metaTypeName.indexOf('.'));
+                metaTypeName = metaTypeName.substring(metaTypeName.indexOf('.') + 1);
+            }
+            metaType = (ARecordType) metadataProvider.findType(dataverseName, metaTypeName);
         }
         // Is a change feed?
         List<IAType> pkTypes = null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
index a807515..7be79fd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
@@ -96,7 +96,7 @@ public class FeedWorkCollection {
                     }
                 } catch (Exception e) {
                     if (LOGGER.isEnabledFor(Level.FATAL)) {
-                        LOGGER.fatal("Exception in executing " + request);
+                        LOGGER.fatal("Exception in executing " + request, e);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 6c5f70a..3d91c7d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -861,7 +861,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                         if (subType.isOpen()) {
                             isOpen = true;
                             break;
-                        } ;
+                        };
                     }
                 }
                 if (fieldExpr.second == null) {
@@ -1379,8 +1379,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 }
             }
 
-            Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList =
-                    new HashMap<FeedConnectionId, Pair<JobSpecification, Boolean>>();
+            Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>();
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 // prepare job spec(s) that would disconnect any active feeds involving the dataset.
                 List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
@@ -1950,7 +1949,7 @@ public class QueryTranslator extends AbstractLangTranslator {
 
     private JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
             ICompiledDmlStatement stmt)
-            throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
+                    throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
 
         // Query Rewriting (happens under the same ongoing metadata transaction)
         Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
@@ -1999,7 +1998,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 default:
                     throw new IllegalStateException();
             }
-
+            FeedMetadataUtil.validateFeed(feed, mdTxnCtx);
             MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
@@ -2260,7 +2259,7 @@ public class QueryTranslator extends AbstractLangTranslator {
      */
     private Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse,
             Feed feed, String dataset, FeedPolicyEntity feedPolicy, MetadataTransactionContext mdTxnCtx)
-            throws MetadataException {
+                    throws MetadataException {
         IFeedJoint sourceFeedJoint = null;
         FeedConnectionRequest request = null;
         List<String> functionsToApply = new ArrayList<String>();
@@ -2920,7 +2919,7 @@ public class QueryTranslator extends AbstractLangTranslator {
     private void prepareRunExternalRuntime(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
             RunStatement pregelixStmt, String dataverseNameFrom, String dataverseNameTo, String datasetNameFrom,
             String datasetNameTo, MetadataTransactionContext mdTxnCtx)
-            throws AlgebricksException, AsterixException, Exception {
+                    throws AlgebricksException, AsterixException, Exception {
         // Validates the source/sink dataverses and datasets.
         Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom, datasetNameFrom);
         if (fromDataset == null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.1.ddl.aql
index 31d6ea8..2c74ac7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.1.ddl.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.1.ddl.aql
@@ -31,11 +31,9 @@ create type DocumentType as open{
 
 create type KVMetaType as open{
 "key":string,
-bucket:string,
 vbucket:int32,
 seq:int64,
 cas:int64,
-creationTime:int64,
 expiration:int32,
 flags:int32,
 revSeq:int64,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_01/feeds_01.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_01/feeds_01.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_01/feeds_01.1.ddl.aql
index 69f87d5..171c067 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_01/feeds_01.1.ddl.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_01/feeds_01.1.ddl.aql
@@ -37,5 +37,5 @@ create dataset Tweets(TweetType)
 primary key id;
 
 create feed TweetFeed
-using file_feed
-(("output-type-name"="TweetType"),("fs"="localfs"),("path"="asterix_nc1://data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
+using localfs
+(("type-name"="TweetType"),("path"="asterix_nc1://data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_03/feeds_03.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_03/feeds_03.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_03/feeds_03.1.ddl.aql
index 3b1595a..5a2a712 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_03/feeds_03.1.ddl.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_03/feeds_03.1.ddl.aql
@@ -42,6 +42,6 @@ $x
 }
 
 create feed TweetFeed
-using file_feed
-(("output-type-name"="TweetType"),("fs"="localfs"),("path"="asterix_nc1://data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
+using localfs
+(("type-name"="TweetType"),("path"="asterix_nc1://data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
 apply function feed_processor;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
index 280974e..ff53e4c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }} }, "Timestamp": "Wed Jun 01 19:07:55 PDT 2016" }
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "localfs", "AdapterConfiguration": {{ { "Name": "type-name", "Value": "TweetType" }, { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" }, { "Name": "is-feed", "Value": "true" }, { "Name": "dataverse", "Value": "feeds" }, { "Name": "feed", "Value": "TweetFeed" }, { "Name": "reader", "Value": "localfs" }, { "Name": "parser", "Value": "adm" } }} }, "Timestamp": "Mon Jun 06 01:59:57 AST 2016" }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
index 2581941..8827fca 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "Function": "feed_processor", "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }} }, "Timestamp": "Wed Jun 01 19:07:55 PDT 2016" }
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "Function": "feed_processor", "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "localfs", "AdapterConfiguration": {{ { "Name": "type-name", "Value": "TweetType" }, { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" }, { "Name": "is-feed", "Value": "true" }, { "Name": "dataverse", "Value": "feeds" }, { "Name": "feed", "Value": "TweetFeed" }, { "Name": "reader", "Value": "localfs" }, { "Name": "parser", "Value": "adm" } }} }, "Timestamp": "Mon Jun 06 02:04:07 AST 2016" }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index 13acfc9..be663e3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -105,6 +105,7 @@
       <compilation-unit name="twitter-feed">
         <output-dir compare="Text">twitter-feed</output-dir>
         <expected-error>One or more parameters are missing from adapter configuration</expected-error>
+        <expected-error>Unknown source feed</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
@@ -6378,13 +6379,13 @@
     <test-case FilePath="load">
       <compilation-unit name="issue14_query">
         <output-dir compare="Text">issue14_query</output-dir>
-        <expected-error>The parameter format must be specified</expected-error>
+        <expected-error>Unspecified parameter: format</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">
       <compilation-unit name="issue315_query">
         <output-dir compare="Text">none</output-dir>
-        <expected-error>The parameter format must be specified</expected-error>
+        <expected-error>Invalid path</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index dc4a17d..1bc4ffe 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6400,13 +6400,13 @@
     <test-case FilePath="load">
       <compilation-unit name="issue14_query">
         <output-dir compare="Text">issue14_query</output-dir>
-        <expected-error>The parameter format must be specified</expected-error>
+        <expected-error>Unspecified parameter: format</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">
       <compilation-unit name="issue315_query">
         <output-dir compare="Text">none</output-dir>
-        <expected-error>The parameter format must be specified</expected-error>
+        <expected-error>Invalid path</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 7e82491..c08d6a4 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -293,7 +293,7 @@
     <dependency>
       <groupId>com.couchbase.client</groupId>
       <artifactId>core-io</artifactId>
-      <version>1.2.7</version>
+      <version>1.2.8</version>
     </dependency>
     <dependency>
       <groupId>io.reactivex</groupId>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index d3abd50..22f046f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -94,8 +94,9 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
             }
             feedLogManager.touch();
         }
-        IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
-                dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogManager);
+        IDataFlowController controller =
+                DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
+                        dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogManager);
         if (isFeed) {
             return new FeedAdapter((AbstractFeedDataFlowController) controller);
         } else {
@@ -122,17 +123,17 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
     }
 
     @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType, ARecordType metaType)
+    public void configure(Map<String, String> configuration)
             throws AsterixException {
-        this.recordType = outputType;
-        this.metaType = metaType;
         this.configuration = configuration;
+        ExternalDataUtils.validateDataSourceParameters(configuration);
         dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
-        dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
         if (dataSourceFactory.isIndexible() && (files != null)) {
             ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
         }
         dataSourceFactory.configure(configuration);
+        ExternalDataUtils.validateDataParserParameters(configuration);
+        dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
         dataParserFactory.setRecordType(recordType);
         dataParserFactory.setMetaType(metaType);
         dataParserFactory.configure(configuration);
@@ -159,7 +160,22 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
     }
 
     @Override
-    public ARecordType getAdapterOutputType() {
+    public ARecordType getOutputType() {
         return recordType;
     }
+
+    @Override
+    public void setOutputType(ARecordType recordType) {
+        this.recordType = recordType;
+    }
+
+    @Override
+    public ARecordType getMetaType() {
+        return metaType;
+    }
+
+    @Override
+    public void setMetaType(ARecordType metaType) {
+        this.metaType = metaType;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
index 59a7514..505c4b2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
@@ -66,22 +66,16 @@ public interface IAdapterFactory extends Serializable {
 
     /**
      * @param configuration
-     * @param outputType
-     * @param metaType
      * @throws Exception
      */
-    public void configure(Map<String, String> configuration, ARecordType outputType, ARecordType metaType)
+    public void configure(Map<String, String> configuration)
             throws AsterixException;
 
-    public default void configure(final Map<String, String> configuration, final ARecordType outputType)
-            throws AsterixException {
-        configure(configuration, outputType, null);
-    }
+    public void setOutputType(ARecordType outputType);
 
-    /**
-     * Gets the record type associated with the output of the adapter
-     *
-     * @return
-     */
-    public ARecordType getAdapterOutputType();
+    public void setMetaType(ARecordType metaType);
+
+    public ARecordType getOutputType();
+
+    public ARecordType getMetaType();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java
deleted file mode 100644
index 4b97e8d..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.api;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-public interface IRecordWithMetaDataParser<T> extends IRecordDataParser<T> {
-    public void parseMeta(DataOutput out) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetadataParser.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetadataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetadataParser.java
new file mode 100644
index 0000000..88120ed
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetadataParser.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface IRecordWithMetadataParser<T> extends IRecordDataParser<T> {
+    public void parseMeta(DataOutput out) throws IOException;
+
+    void appendLastParsedPrimaryKeyToTuple(ArrayTupleBuilder tb) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index aac7be2..b47d278 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -22,23 +22,23 @@ import java.io.IOException;
 
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.api.IRecordWithMetadataParser;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
-public class ChangeFeedWithMetaDataFlowController<T, O> extends FeedWithMetaDataFlowController<T, O> {
+public class ChangeFeedWithMetaDataFlowController<T> extends FeedWithMetaDataFlowController<T> {
 
     public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader)
-            throws HyracksDataException {
+            final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader)
+                    throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
     }
 
     @Override
     protected void addPrimaryKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends T> record) throws IOException {
-        dataParser.appendPK(tb);
+        ((IRecordWithMetadataParser<T>) dataParser).appendLastParsedPrimaryKeyToTuple(tb);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index e7c396b..44aac60 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -22,27 +22,23 @@ import java.io.IOException;
 
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.api.IRecordWithMetadataParser;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
-public class FeedWithMetaDataFlowController<T, O> extends FeedRecordDataFlowController<T> {
-
-    //This field mask a super class field dataParser. We do this to avoid down-casting when calling parseMeta
-    protected RecordWithMetadataParser<T, O> dataParser;
+public class FeedWithMetaDataFlowController<T> extends FeedRecordDataFlowController<T> {
 
     public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
-            FeedLogManager feedLogManager, int numOfOutputFields, RecordWithMetadataParser<T, O> dataParser,
+            FeedLogManager feedLogManager, int numOfOutputFields, IRecordWithMetadataParser<T> dataParser,
             IRecordReader<T> recordReader) throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
-        this.dataParser = dataParser;
     }
 
     @Override
     protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
-        dataParser.parseMeta(tb.getDataOutput());
+        ((IRecordWithMetadataParser<T>) dataParser).parseMeta(tb.getDataOutput());
         tb.addFieldEndOffset();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
index ca6725f..3ed6af8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
@@ -154,7 +154,7 @@ public class RecordWithMetadataAndPK<T> extends RecordWithPK<T> {
     }
 
     @Override
-    public void appendPk(final ArrayTupleBuilder tb) throws IOException {
+    public void appendPrimaryKeyToTuple(final ArrayTupleBuilder tb) throws IOException {
         for (int i = 0; i < pkIndexes.length; i++) {
             if (keyIndicator[i] == 1) {
                 tb.addField(getMetadata(pkIndexes[i]));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithPK.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithPK.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithPK.java
index b99d4d5..02ab028 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithPK.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithPK.java
@@ -86,7 +86,7 @@ public class RecordWithPK<T> implements IRawRecord<T> {
         record.set(t);
     }
 
-    public void appendPk(final ArrayTupleBuilder tb) throws IOException {
+    public void appendPrimaryKeyToTuple(final ArrayTupleBuilder tb) throws IOException {
         for (final ArrayBackedValueStorage pkStorage : pkFieldValueBuffers) {
             tb.addField(pkStorage);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
index 1d9311e..dc93533 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
@@ -51,7 +51,7 @@ public class DCPConverterFactory implements IRecordConverterFactory<DCPRequest,
 
     @Override
     public IRecordConverter<DCPRequest, RecordWithMetadataAndPK<char[]>> createConverter() {
-        return new DCPRequestToRecordWithMetadataAndPKConverter();
+        return new DCPMessageToRecordConverter();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPMessageToRecordConverter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPMessageToRecordConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPMessageToRecordConverter.java
new file mode 100644
index 0000000..6ce5e98
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPMessageToRecordConverter.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.converter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.couchbase.client.core.message.dcp.DCPRequest;
+import com.couchbase.client.core.message.dcp.MutationMessage;
+import com.couchbase.client.core.message.dcp.RemoveMessage;
+import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+import com.couchbase.client.deps.io.netty.util.ReferenceCountUtil;
+
+public class DCPMessageToRecordConverter
+        implements IRecordToRecordWithMetadataAndPKConverter<DCPRequest, char[]> {
+
+    private final RecordWithMetadataAndPK<char[]> recordWithMetadata;
+    private final CharArrayRecord value;
+    private final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+    private final ByteBuffer bytes = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+    private final CharBuffer chars = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+    private static final IAType[] CB_META_TYPES = new IAType[] { /*ID*/BuiltinType.ASTRING,
+            /*VBID*/BuiltinType.AINT32, /*SEQ*/BuiltinType.AINT64, /*CAS*/BuiltinType.AINT64,
+            /*EXPIRATION*/BuiltinType.AINT32,
+            /*FLAGS*/BuiltinType.AINT32, /*REV*/BuiltinType.AINT64, /*LOCK*/BuiltinType.AINT32 };
+    private static final int[] PK_INDICATOR = { 1 };
+    private static final int[] PK_INDEXES = { 0 };
+    private static final IAType[] PK_TYPES = { BuiltinType.ASTRING };
+
+    public DCPMessageToRecordConverter() {
+        this.value = new CharArrayRecord();
+        this.recordWithMetadata = new RecordWithMetadataAndPK<>(value, CB_META_TYPES,
+                ARecordType.FULLY_OPEN_RECORD_TYPE, PK_INDICATOR, PK_INDEXES, PK_TYPES);
+    }
+
+    @Override
+    public RecordWithMetadataAndPK<char[]> convert(final IRawRecord<? extends DCPRequest> input) throws IOException {
+        final DCPRequest dcpRequest = input.get();
+        if (dcpRequest instanceof MutationMessage) {
+            final MutationMessage message = (MutationMessage) dcpRequest;
+            try {
+                final String key = message.key();
+                final int vbucket = message.partition();
+                final long seq = message.bySequenceNumber();
+                final long cas = message.cas();
+                final int expiration = message.expiration();
+                final int flags = message.flags();
+                final long revSeqNumber = message.revisionSequenceNumber();
+                final int lockTime = message.lockTime();
+                int i = 0;
+                recordWithMetadata.reset();
+                recordWithMetadata.setMetadata(i++, key);
+                recordWithMetadata.setMetadata(i++, vbucket);
+                recordWithMetadata.setMetadata(i++, seq);
+                recordWithMetadata.setMetadata(i++, cas);
+                recordWithMetadata.setMetadata(i++, expiration);
+                recordWithMetadata.setMetadata(i++, flags);
+                recordWithMetadata.setMetadata(i++, revSeqNumber);
+                recordWithMetadata.setMetadata(i, lockTime);
+                DCPMessageToRecordConverter.set(message.content(), decoder, bytes, chars, value);
+            } finally {
+                ReferenceCountUtil.release(message.content());
+            }
+        } else if (dcpRequest instanceof RemoveMessage) {
+            final RemoveMessage message = (RemoveMessage) dcpRequest;
+            final String key = message.key();
+            recordWithMetadata.reset();
+            recordWithMetadata.setMetadata(0, key);
+        } else {
+            throw new HyracksDataException("Unknown DCP request: " + dcpRequest);
+        }
+        return recordWithMetadata;
+    }
+
+    public static void set(final ByteBuf content, final CharsetDecoder decoder, final ByteBuffer bytes,
+            final CharBuffer chars, final CharArrayRecord record) throws IOException {
+        int position = content.readerIndex();
+        final int limit = content.writerIndex();
+        final int contentSize = content.readableBytes();
+        while (position < limit) {
+            bytes.clear();
+            chars.clear();
+            if ((contentSize - position) < bytes.capacity()) {
+                bytes.limit(contentSize - position);
+            }
+            content.getBytes(position, bytes);
+            position += bytes.position();
+            bytes.flip();
+            decoder.decode(bytes, chars, false);
+            chars.flip();
+            record.append(chars);
+        }
+        record.endRecord();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
deleted file mode 100644
index 724699c..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.converter;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.input.record.CharArrayRecord;
-import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-import com.couchbase.client.core.message.dcp.DCPRequest;
-import com.couchbase.client.core.message.dcp.MutationMessage;
-import com.couchbase.client.core.message.dcp.RemoveMessage;
-import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
-import com.couchbase.client.deps.io.netty.util.ReferenceCountUtil;
-
-public class DCPRequestToRecordWithMetadataAndPKConverter
-        implements IRecordToRecordWithMetadataAndPKConverter<DCPRequest, char[]> {
-
-    private final RecordWithMetadataAndPK<char[]> recordWithMetadata;
-    private final CharArrayRecord value;
-    private final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
-    private final ByteBuffer bytes = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
-    private final CharBuffer chars = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
-    // metaTypes = {key(string), bucket(string), vbucket(int32), seq(long), cas(long),
-    // creationTime(long),expiration(int32),flags(int32),revSeqNumber(long),lockTime(int32)}
-    private static final IAType[] CB_META_TYPES = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
-            BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT32,
-            BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT32 };
-    private static final int[] PK_INDICATOR = { 1 };
-    private static final int[] PK_INDEXES = { 0 };
-    private static final IAType[] PK_TYPES = { BuiltinType.ASTRING };
-
-    public DCPRequestToRecordWithMetadataAndPKConverter() {
-        this.value = new CharArrayRecord();
-        this.recordWithMetadata = new RecordWithMetadataAndPK<char[]>(value, CB_META_TYPES,
-                ARecordType.FULLY_OPEN_RECORD_TYPE, PK_INDICATOR, PK_INDEXES, PK_TYPES);
-    }
-
-    @Override
-    public RecordWithMetadataAndPK<char[]> convert(final IRawRecord<? extends DCPRequest> input) throws IOException {
-        final DCPRequest dcpRequest = input.get();
-        if (dcpRequest instanceof MutationMessage) {
-            final MutationMessage message = (MutationMessage) dcpRequest;
-            final String key = message.key();
-            final int vbucket = message.partition();
-            final long seq = message.bySequenceNumber();
-            final String bucket = message.bucket();
-            final long cas = message.cas();
-            final long creationTime = message.creationTime();
-            final int expiration = message.expiration();
-            final int flags = message.flags();
-            final long revSeqNumber = message.revisionSequenceNumber();
-            final int lockTime = message.lockTime();
-            recordWithMetadata.reset();
-            recordWithMetadata.setMetadata(0, key);
-            recordWithMetadata.setMetadata(1, bucket);
-            recordWithMetadata.setMetadata(2, vbucket);
-            recordWithMetadata.setMetadata(3, seq);
-            recordWithMetadata.setMetadata(4, cas);
-            recordWithMetadata.setMetadata(5, creationTime);
-            recordWithMetadata.setMetadata(6, expiration);
-            recordWithMetadata.setMetadata(7, flags);
-            recordWithMetadata.setMetadata(8, revSeqNumber);
-            recordWithMetadata.setMetadata(9, lockTime);
-            DCPRequestToRecordWithMetadataAndPKConverter.set(message.content(), decoder, bytes, chars, value);
-            ReferenceCountUtil.release(message.content());
-        } else if (dcpRequest instanceof RemoveMessage) {
-            final RemoveMessage message = (RemoveMessage) dcpRequest;
-            final String key = message.key();
-            recordWithMetadata.reset();
-            recordWithMetadata.setMetadata(0, key);
-        } else {
-            throw new HyracksDataException("Unknown DCP request: " + dcpRequest);
-        }
-        return recordWithMetadata;
-    }
-
-    public static void set(final ByteBuf content, final CharsetDecoder decoder, final ByteBuffer bytes,
-            final CharBuffer chars, final CharArrayRecord record) throws IOException {
-        int position = content.readerIndex();
-        final int limit = content.writerIndex();
-        final int contentSize = content.readableBytes();
-        while (position < limit) {
-            bytes.clear();
-            chars.clear();
-            if ((contentSize - position) < bytes.capacity()) {
-                bytes.limit(contentSize - position);
-            }
-            content.getBytes(position, bytes);
-            position += bytes.position();
-            bytes.flip();
-            decoder.decode(bytes, chars, false);
-            chars.flip();
-            record.append(chars);
-        }
-        record.endRecord();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index b1fd7a0..58e9ea7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -110,7 +110,8 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
                 adaptorLibraryName);
         if (classLoader != null) {
             adapterFactory = ((IAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance()));
-            adapterFactory.configure(adaptorConfiguration, adapterOutputType);
+            adapterFactory.setOutputType(adapterOutputType);
+            adapterFactory.configure(adaptorConfiguration);
         } else {
             String message = "Unable to create adapter as class loader not configured for library " + adaptorLibraryName
                     + " in dataverse " + feedId.getDataverse();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
index 5c46a42..be308d4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
@@ -26,7 +26,7 @@ import org.apache.asterix.external.api.IDataParser;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordConverter;
 import org.apache.asterix.external.api.IRecordDataParser;
-import org.apache.asterix.external.api.IRecordWithMetaDataParser;
+import org.apache.asterix.external.api.IRecordWithMetadataParser;
 import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMutableString;
@@ -39,7 +39,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
-public class RecordWithMetadataParser<T, O> implements IRecordWithMetaDataParser<T> {
+public class RecordWithMetadataParser<T, O> implements IRecordWithMetadataParser<T> {
 
     private final IRecordConverter<T, RecordWithMetadataAndPK<O>> converter;
     private RecordWithMetadataAndPK<O> rwm;
@@ -106,7 +106,8 @@ public class RecordWithMetadataParser<T, O> implements IRecordWithMetaDataParser
         }
     }
 
-    public void appendPK(ArrayTupleBuilder tb) throws IOException {
-        rwm.appendPk(tb);
+    @Override
+    public void appendLastParsedPrimaryKeyToTuple(ArrayTupleBuilder tb) throws IOException {
+        rwm.appendPrimaryKeyToTuple(tb);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index e39e3bd..8f6c85f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -28,7 +28,6 @@ import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IIndexingAdapterFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
-import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 
@@ -37,13 +36,14 @@ import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
  */
 public class AdapterFactoryProvider {
 
-    // Internal Adapters
+    // Adapters
     public static IAdapterFactory getAdapterFactory(String adapterName, Map<String, String> configuration,
             ARecordType itemType, ARecordType metaType) throws AsterixException {
         ExternalDataCompatibilityUtils.prepare(adapterName, configuration);
-        ExternalDataUtils.validateParameters(configuration);
         GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
-        adapterFactory.configure(configuration, itemType, metaType);
+        adapterFactory.setOutputType(itemType);
+        adapterFactory.setMetaType(metaType);
+        adapterFactory.configure(configuration);
         return adapterFactory;
     }
 
@@ -52,10 +52,11 @@ public class AdapterFactoryProvider {
             Map<String, String> configuration, ARecordType itemType, List<ExternalFile> snapshot, boolean indexingOp,
             ARecordType metaType) throws AsterixException {
         ExternalDataCompatibilityUtils.prepare(adapterName, configuration);
-        ExternalDataUtils.validateParameters(configuration);
         GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
+        adapterFactory.setOutputType(itemType);
+        adapterFactory.setMetaType(metaType);
         adapterFactory.setSnapshot(snapshot, indexingOp);
-        adapterFactory.configure(configuration, itemType, metaType);
+        adapterFactory.configure(configuration);
         return adapterFactory;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 50ebb71..4ad08b3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -32,6 +32,7 @@ import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordDataParserFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.api.IRecordWithMetadataParser;
 import org.apache.asterix.external.api.IRecordWithPKDataParser;
 import org.apache.asterix.external.api.IStreamDataParser;
 import org.apache.asterix.external.api.IStreamDataParserFactory;
@@ -44,7 +45,6 @@ import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController;
 import org.apache.asterix.external.dataflow.IndexingDataFlowController;
 import org.apache.asterix.external.dataflow.RecordDataFlowController;
 import org.apache.asterix.external.dataflow.StreamDataFlowController;
-import org.apache.asterix.external.parser.RecordWithMetadataParser;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedLogManager;
@@ -59,7 +59,7 @@ public class DataflowControllerProvider {
     public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
             int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
             Map<String, String> configuration, boolean indexingOp, boolean isFeed, FeedLogManager feedLogManager)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         try {
             switch (dataSourceFactory.getDataSourceType()) {
                 case RECORDS:
@@ -80,10 +80,10 @@ public class DataflowControllerProvider {
                             if (isChangeFeed) {
                                 int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
                                 return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
-                                        numOfKeys + 2, (RecordWithMetadataParser) dataParser, recordReader);
+                                        numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader);
                             } else {
                                 return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2,
-                                        (RecordWithMetadataParser) dataParser, recordReader);
+                                        (IRecordWithMetadataParser) dataParser, recordReader);
                             }
                         } else if (isChangeFeed) {
                             int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 2e1a5a7..b37198a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -45,8 +45,11 @@ public class ParserFactoryProvider {
             return ExternalDataUtils.createExternalParserFactory(ExternalDataUtils.getDataverse(configuration),
                     parserFactoryName);
         } else {
-            parserFactory = ParserFactoryProvider
-                    .getDataParserFactory(ExternalDataUtils.getRecordFormat(configuration));
+            String parserFactoryKey = ExternalDataUtils.getRecordFormat(configuration);
+            if (parserFactoryKey == null) {
+                parserFactoryKey = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
+            }
+            parserFactory = ParserFactoryProvider.getDataParserFactory(parserFactoryKey);
         }
         return parserFactory;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
index ea8bc98..d11e97f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
@@ -54,7 +54,7 @@ public class StreamRecordReaderProvider {
             }
             throw new AsterixException("Unknown format: " + format);
         }
-        throw new AsterixException("Unspecified paramter: " + ExternalDataConstants.KEY_FORMAT);
+        throw new AsterixException("Unspecified parameter: " + ExternalDataConstants.KEY_FORMAT);
     }
 
     public static StreamRecordReader createRecordReader(Format format, AsterixInputStream inputStream,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index c992723..55dee04 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -55,7 +55,7 @@ public class ExternalDataConstants {
     public static final String KEY_HADOOP_BUFFER_SIZE = "io.file.buffer.size";
     public static final String KEY_SOURCE_DATATYPE = "type-name";
     public static final String KEY_DELIMITER = "delimiter";
-    public static final String KEY_PARSER_FACTORY = "tuple-parser";
+    public static final String KEY_PARSER_FACTORY = "parser-factory";
     public static final String KEY_DATA_PARSER = "parser";
     public static final String KEY_HEADER = "header";
     public static final String KEY_READER = "reader";

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index e33949e..69882d0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -78,14 +78,26 @@ public class ExternalDataUtils {
     }
 
     public static void validateParameters(Map<String, String> configuration) throws AsterixException {
+        validateDataSourceParameters(configuration);
+        validateDataParserParameters(configuration);
+    }
+
+    public static void validateDataParserParameters(Map<String, String> configuration) throws AsterixException {
+        String parser = configuration.get(ExternalDataConstants.KEY_FORMAT);
+        if (parser == null) {
+            String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
+            if (parserFactory == null) {
+                throw new AsterixException("The parameter " + ExternalDataConstants.KEY_FORMAT + " or "
+                        + ExternalDataConstants.KEY_PARSER_FACTORY + " must be specified.");
+            }
+        }
+    }
+
+    public static void validateDataSourceParameters(Map<String, String> configuration) throws AsterixException {
         String reader = configuration.get(ExternalDataConstants.KEY_READER);
         if (reader == null) {
             throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER + " must be specified.");
         }
-        String parser = configuration.get(ExternalDataConstants.KEY_FORMAT);
-        if (parser == null) {
-            throw new AsterixException("The parameter " + ExternalDataConstants.KEY_FORMAT + " must be specified.");
-        }
     }
 
     public static DataSourceType getDataSourceType(Map<String, String> configuration) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/generator/test/DCPGeneratorTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/generator/test/DCPGeneratorTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/generator/test/DCPGeneratorTest.java
index b08fc7d..5e4e5eb 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/generator/test/DCPGeneratorTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/generator/test/DCPGeneratorTest.java
@@ -20,7 +20,7 @@ package org.apache.asterix.external.generator.test;
 
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
-import org.apache.asterix.external.input.record.converter.DCPRequestToRecordWithMetadataAndPKConverter;
+import org.apache.asterix.external.input.record.converter.DCPMessageToRecordConverter;
 import org.apache.asterix.external.input.record.reader.kv.KVTestReader;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.junit.Test;
@@ -34,7 +34,7 @@ public class DCPGeneratorTest {
         try (KVTestReader cbreader = new KVTestReader(0, "TestBucket",
                 new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }, 150, 0, 0, 0)) {
             final UTF8StringPointable pointable = new UTF8StringPointable();
-            final DCPRequestToRecordWithMetadataAndPKConverter converter = new DCPRequestToRecordWithMetadataAndPKConverter();
+            final DCPMessageToRecordConverter converter = new DCPMessageToRecordConverter();
             while (cbreader.hasNext()) {
                 final IRawRecord<DCPRequest> dcp = cbreader.next();
                 final RecordWithMetadataAndPK<char[]> record = converter.convert(dcp);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java
index a5e422c..adffe1e 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java
@@ -791,6 +791,7 @@ public class ClassAdParser extends AbstractDataParser implements IRecordDataPars
     }
 
     public boolean parseNext(ClassAd classad) throws IOException {
+        resetPools();
         return parseClassAd(currentSource, classad, false);
     }
 
@@ -1350,7 +1351,7 @@ public class ClassAdParser extends AbstractDataParser implements IRecordDataPars
                     if (!parseArgumentList(argList)) {
                         tree.setInnerTree(null);
                         return false;
-                    } ;
+                    };
                     // special case function-calls should be converted
                     // into a literal expression if the argument is a
                     // string literal
@@ -1398,7 +1399,7 @@ public class ClassAdParser extends AbstractDataParser implements IRecordDataPars
                 tree.setInnerTree(Operation.createOperation(Operation.OpKind_PARENTHESES_OP, treeL, objectPool));
                 return (tree.size() != 0);
             }
-            // constants
+                // constants
             case LEX_OPEN_BOX: {
                 isExpr = true;
                 ClassAd newAd = objectPool.classAdPool.get();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index d1a4532..e029c09 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -77,9 +77,11 @@ public class TestTypedAdapterFactory implements IAdapterFactory {
                 ADMDataParser parser;
                 ITupleForwarder forwarder;
                 ArrayTupleBuilder tb;
-                IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) ((NodeControllerService) ctx
-                        .getJobletContext().getApplicationContext().getControllerService()).getApplicationContext()
-                                .getApplicationObject();
+                IAsterixPropertiesProvider propertiesProvider =
+                        (IAsterixPropertiesProvider) ((NodeControllerService) ctx
+                                .getJobletContext().getApplicationContext().getControllerService())
+                                        .getApplicationContext()
+                                        .getApplicationObject();
                 ClusterPartition nodePartition = propertiesProvider.getMetadataProperties().getNodePartitions()
                         .get(nodeId)[0];
                 try {
@@ -125,13 +127,27 @@ public class TestTypedAdapterFactory implements IAdapterFactory {
     }
 
     @Override
-    public ARecordType getAdapterOutputType() {
-        return outputType;
+    public void configure(Map<String, String> configuration) {
+        this.configuration = configuration;
     }
 
     @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType, ARecordType metaType) {
-        this.configuration = configuration;
+    public void setOutputType(ARecordType outputType) {
         this.outputType = outputType;
     }
+
+    @Override
+    public void setMetaType(ARecordType metaType) {
+    }
+
+    @Override
+    public ARecordType getOutputType() {
+        return outputType;
+    }
+
+    @Override
+    public ARecordType getMetaType() {
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/TestRecordWithPKParser.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/TestRecordWithPKParser.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/TestRecordWithPKParser.java
index 3d86abd..fd617bd 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/TestRecordWithPKParser.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/TestRecordWithPKParser.java
@@ -43,7 +43,7 @@ public class TestRecordWithPKParser<T> implements IRecordWithPKDataParser<Record
     @Override
     public void appendKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends RecordWithPK<T>> record)
             throws IOException {
-        record.get().appendPk(tb);
+        record.get().appendPrimaryKeyToTuple(tb);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index faff9df..c0c8eb6 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -121,7 +121,7 @@ public class RecordWithMetaTest {
                 tb.addFieldEndOffset();
                 parser.parseMeta(tb.getDataOutput());
                 tb.addFieldEndOffset();
-                parser.appendPK(tb);
+                parser.appendLastParsedPrimaryKeyToTuple(tb);
                 //print tuple
                 printTuple(tb, printers, printStream);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/67dabe36/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 08a98a4..1a51b74 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -38,6 +38,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType;
 import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -453,6 +454,71 @@ public class FeedMetadataUtil {
         return preProcessingRequired;
     }
 
+    public static void validateFeed(Feed feed, MetadataTransactionContext mdTxnCtx)
+            throws AsterixException {
+        try {
+            String adapterName = feed.getAdapterName();
+            Map<String, String> configuration = feed.getAdapterConfiguration();
+            ARecordType adapterOutputType = getOutputType(feed, configuration, ExternalDataConstants.KEY_TYPE_NAME);
+            ARecordType metaType = getOutputType(feed, configuration, ExternalDataConstants.KEY_META_TYPE_NAME);
+            ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName());
+            ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName());
+            // Get adapter from metadata dataset <Metadata dataverse>
+            DatasourceAdapter adapterEntity =
+                    MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+                            adapterName);
+            // Get adapter from metadata dataset <The feed dataverse>
+            if (adapterEntity == null) {
+                adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
+            }
+            AdapterType adapterType;
+            IAdapterFactory adapterFactory;
+            if (adapterEntity != null) {
+                adapterType = adapterEntity.getType();
+                String adapterFactoryClassname = adapterEntity.getClassname();
+                switch (adapterType) {
+                    case INTERNAL:
+                        adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+                        break;
+                    case EXTERNAL:
+                        String[] anameComponents = adapterName.split("#");
+                        String libraryName = anameComponents[0];
+                        ClassLoader cl =
+                                ExternalLibraryManager.getLibraryClassLoader(feed.getDataverseName(), libraryName);
+                        adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
+                        break;
+                    default:
+                        throw new AsterixException("Unknown Adapter type " + adapterType);
+                }
+                adapterFactory.setOutputType(adapterOutputType);
+                adapterFactory.setMetaType(metaType);
+                adapterFactory.configure(configuration);
+            } else {
+                AdapterFactoryProvider.getAdapterFactory(adapterName, configuration, adapterOutputType,
+                        metaType);
+            }
+            if (metaType == null && configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME)) {
+                metaType = getOutputType(feed, configuration, ExternalDataConstants.KEY_META_TYPE_NAME);
+                if (metaType == null) {
+                    throw new AsterixException("Unknown specified feed meta output data type "
+                            + configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME));
+                }
+            }
+            if (adapterOutputType == null) {
+                if (!configuration.containsKey(ExternalDataConstants.KEY_TYPE_NAME)) {
+                    throw new AsterixException("Unspecified feed output data type");
+                }
+                adapterOutputType = getOutputType(feed, configuration, ExternalDataConstants.KEY_TYPE_NAME);
+                if (adapterOutputType == null) {
+                    throw new AsterixException("Unknown specified feed output data type "
+                            + configuration.get(ExternalDataConstants.KEY_TYPE_NAME));
+                }
+            }
+        } catch (Exception e) {
+            throw new AsterixException("Invalid feed parameters", e);
+        }
+    }
+
     @SuppressWarnings("rawtypes")
     public static Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType>
             getPrimaryFeedFactoryAndOutput(Feed feed, FeedPolicyAccessor policyAccessor,
@@ -494,13 +560,26 @@ public class FeedMetadataUtil {
                                 ExternalLibraryManager.getLibraryClassLoader(feed.getDataverseName(), libraryName);
                         adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
                         break;
+                    default:
+                        throw new AsterixException("Unknown Adapter type " + adapterType);
                 }
-                adapterFactory.configure(configuration, adapterOutputType, metaType);
+                adapterFactory.setOutputType(adapterOutputType);
+                adapterFactory.setMetaType(metaType);
+                adapterFactory.configure(configuration);
             } else {
                 adapterFactory = AdapterFactoryProvider.getAdapterFactory(adapterName, configuration, adapterOutputType,
                         metaType);
                 adapterType = IDataSourceAdapter.AdapterType.INTERNAL;
             }
+            if (metaType == null) {
+                metaType = getOutputType(feed, configuration, ExternalDataConstants.KEY_META_TYPE_NAME);
+            }
+            if (adapterOutputType == null) {
+                if (!configuration.containsKey(ExternalDataConstants.KEY_TYPE_NAME)) {
+                    throw new AsterixException("Unspecified feed output data type");
+                }
+                adapterOutputType = getOutputType(feed, configuration, ExternalDataConstants.KEY_TYPE_NAME);
+            }
             int numOfOutputs = 1;
             if (metaType != null) {
                 numOfOutputs++;
@@ -516,27 +595,7 @@ public class FeedMetadataUtil {
                 serdes[i++] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
             }
             if (ExternalDataUtils.isChangeFeed(configuration)) {
-                int[] pkIndexes = ExternalDataUtils.getPKIndexes(configuration);
-                if (metaType != null) {
-                    int[] pkIndicators = ExternalDataUtils.getPKSourceIndicators(configuration);
-                    for (int j = 0; j < pkIndexes.length; j++) {
-                        int aInt = pkIndexes[j];
-                        if (pkIndicators[j] == 0) {
-                            serdes[i++] = AqlSerializerDeserializerProvider.INSTANCE
-                                    .getSerializerDeserializer(adapterOutputType.getFieldTypes()[aInt]);
-                        } else if (pkIndicators[j] == 1) {
-                            serdes[i++] = AqlSerializerDeserializerProvider.INSTANCE
-                                    .getSerializerDeserializer(metaType.getFieldTypes()[aInt]);
-                        } else {
-                            throw new AlgebricksException("a key source indicator can only be 0 or 1");
-                        }
-                    }
-                } else {
-                    for (int aInt : pkIndexes) {
-                        serdes[i++] = AqlSerializerDeserializerProvider.INSTANCE
-                                .getSerializerDeserializer(adapterOutputType.getFieldTypes()[aInt]);
-                    }
-                }
+                getSerdesForPKs(serdes, configuration, metaType, adapterOutputType, i);
             }
             feedProps = new Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType>(adapterFactory,
                     new RecordDescriptor(serdes), adapterType);
@@ -546,6 +605,32 @@ public class FeedMetadataUtil {
         return feedProps;
     }
 
+    @SuppressWarnings("rawtypes")
+    private static void getSerdesForPKs(ISerializerDeserializer[] serdes, Map<String, String> configuration,
+            ARecordType metaType, ARecordType adapterOutputType, int index) throws AlgebricksException {
+        int[] pkIndexes = ExternalDataUtils.getPKIndexes(configuration);
+        if (metaType != null) {
+            int[] pkIndicators = ExternalDataUtils.getPKSourceIndicators(configuration);
+            for (int j = 0; j < pkIndexes.length; j++) {
+                int aInt = pkIndexes[j];
+                if (pkIndicators[j] == 0) {
+                    serdes[index++] = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(adapterOutputType.getFieldTypes()[aInt]);
+                } else if (pkIndicators[j] == 1) {
+                    serdes[index++] = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(metaType.getFieldTypes()[aInt]);
+                } else {
+                    throw new AlgebricksException("a key source indicator can only be 0 or 1");
+                }
+            }
+        } else {
+            for (int aInt : pkIndexes) {
+                serdes[index++] = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(adapterOutputType.getFieldTypes()[aInt]);
+            }
+        }
+    }
+
     public static ARecordType getOutputType(IFeed feed, Map<String, String> configuration, String key)
             throws RemoteException, ACIDException, MetadataException {
         ARecordType outputType = null;
@@ -596,7 +681,7 @@ public class FeedMetadataUtil {
 
     public static String getSecondaryFeedOutput(Feed feed, FeedPolicyAccessor policyAccessor,
             MetadataTransactionContext mdTxnCtx)
-            throws AlgebricksException, MetadataException, RemoteException, ACIDException {
+                    throws AlgebricksException, MetadataException, RemoteException, ACIDException {
         String outputType = null;
         String primaryFeedName = feed.getSourceFeedName();
         Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feed.getDataverseName(), primaryFeedName);


Mime
View raw message