asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [01/19] incubator-asterixdb git commit: Support Change Feeds and Ingestion of Records with MetaData
Date Tue, 15 Mar 2016 23:36:17 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 205e4900e -> d3338f665


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-installer/src/test/resources/integrationts/lifecycle/results/asterix-lifecycle/backupRestore/backupRestore.1.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/lifecycle/results/asterix-lifecycle/backupRestore/backupRestore.1.adm b/asterix-installer/src/test/resources/integrationts/lifecycle/results/asterix-lifecycle/backupRestore/backupRestore.1.adm
index f28e810..7523328 100644
--- a/asterix-installer/src/test/resources/integrationts/lifecycle/results/asterix-lifecycle/backupRestore/backupRestore.1.adm
+++ b/asterix-installer/src/test/resources/integrationts/lifecycle/results/asterix-lifecycle/backupRestore/backupRestore.1.adm
@@ -1 +1 @@
-{ "DataverseName": "backupDataverse", "DataFormat": "org.apache.asterix.runtime.formats.NonTaggedDataFormat", "Timestamp": "Wed Apr 24 16:13:46 PDT 2013", "PendingOp": 0 }
+{ "DataverseName": "backupDataverse", "DataFormat": "org.apache.asterix.runtime.formats.NonTaggedDataFormat", "Timestamp": "Wed Apr 24 16:13:46 PDT 2013", "PendingOp": 0i32 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.2.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.2.update.aql
index 47f5975..c0b4919 100644
--- a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.2.update.aql
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.2.update.aql
@@ -30,6 +30,7 @@
 use dataverse TinySocial;
 
 load dataset FacebookUsers using localfs
-(("path"="asterix_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
+(("path"="asterix_nc1:///vagrant/data/fbu.adm"),
+("format"="adm"));
 
 insert into dataset TinySocial.FacebookUsersInMemory(for $x in dataset TinySocial.FacebookUsers return $x);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index 3f85ba9..ae657d4 100644
--- a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -19,18 +19,19 @@
 package org.apache.asterix.lang.aql.statement;
 
 import java.io.StringReader;
+import java.rmi.RemoteException;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.watch.FeedActivity;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.lang.aql.parser.AQLParserFactory;
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.IParserFactory;
@@ -45,9 +46,7 @@ import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
-import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Triple;
 
 /**
  * Represents the AQL statement for subscribing to a feed.
@@ -58,7 +57,7 @@ public class SubscribeFeedStatement implements Statement {
     private static final Logger LOGGER = Logger.getLogger(SubscribeFeedStatement.class.getName());
     private final FeedConnectionRequest connectionRequest;
     private Query query;
-    private int varCounter;
+    private final int varCounter;
     private final String[] locations;
 
     public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed";
@@ -106,7 +105,7 @@ public class SubscribeFeedStatement implements Statement {
                 + connectionRequest.getTargetDataset() + "'" + "," + "'" + feedOutputType + "'" + ")");
 
         List<String> functionsToApply = connectionRequest.getFunctionsToApply();
-        if (functionsToApply != null && functionsToApply.isEmpty()) {
+        if ((functionsToApply != null) && functionsToApply.isEmpty()) {
             builder.append(" return $x");
         } else {
             String rValueName = "x";
@@ -186,10 +185,9 @@ public class SubscribeFeedStatement implements Statement {
         try {
             switch (feed.getFeedType()) {
                 case PRIMARY:
-                    Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> factoryOutput = null;
-
-                    factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(feed, policyAccessor, mdTxnCtx);
-                    outputType = factoryOutput.second.getTypeName();
+                    outputType = FeedMetadataUtil
+                            .getOutputType(feed, feed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME)
+                            .getTypeName();
                     break;
                 case SECONDARY:
                     outputType = FeedMetadataUtil.getSecondaryFeedOutput(feed, policyAccessor, mdTxnCtx);
@@ -197,7 +195,7 @@ public class SubscribeFeedStatement implements Statement {
             }
             return outputType;
 
-        } catch (AlgebricksException ae) {
+        } catch (AlgebricksException | RemoteException | ACIDException ae) {
             throw new MetadataException(ae);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
index e2605ec..a5347fc 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
@@ -46,13 +46,13 @@ import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitione
 
 public abstract class AqlDataSource implements IDataSource<AqlSourceId> {
 
-    private final AqlSourceId id;
-    private final IAType itemType;
-    private final IAType metaItemType;
-    private final AqlDataSourceType datasourceType;
+    protected final AqlSourceId id;
+    protected final IAType itemType;
+    protected final IAType metaItemType;
+    protected final AqlDataSourceType datasourceType;
     protected IAType[] schemaTypes;
     protected INodeDomain domain;
-    private Map<String, Serializable> properties = new HashMap<>();
+    protected Map<String, Serializable> properties = new HashMap<>();
 
     public enum AqlDataSourceType {
         INTERNAL_DATASET,
@@ -142,7 +142,7 @@ public abstract class AqlDataSource implements IDataSource<AqlSourceId> {
                         for (LogicalVariable v : scanVariables) {
                             pvars.add(v);
                             ++i;
-                            if (i >= n - 1) {
+                            if (i >= (n - 1)) {
                                 break;
                             }
                         }
@@ -162,7 +162,7 @@ public abstract class AqlDataSource implements IDataSource<AqlSourceId> {
                         for (LogicalVariable v : scanVariables) {
                             pvars.add(v);
                             ++i;
-                            if (i >= n - 1) {
+                            if (i >= (n - 1)) {
                                 break;
                             }
                         }
@@ -170,7 +170,7 @@ public abstract class AqlDataSource implements IDataSource<AqlSourceId> {
                     }
                     propsLocal = new ArrayList<ILocalStructuralProperty>();
                     List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
-                    for (int i = 0; i < n - 1; i++) {
+                    for (int i = 0; i < (n - 1); i++) {
                         orderColumns.add(new OrderColumn(scanVariables.get(i), OrderKind.ASC));
                     }
                     propsLocal.add(new LocalOrderProperty(orderColumns));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index e0084f8..650dc21 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -71,6 +71,7 @@ import org.apache.asterix.external.util.FeedConstants;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.AqlLinearizeComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
@@ -180,6 +181,7 @@ import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescripto
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 
 public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
+
     private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
     private MetadataTransactionContext mdTxnCtx;
     private boolean isWriteTransaction;
@@ -334,7 +336,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         try {
             switch (((AqlDataSource) dataSource).getDatasourceType()) {
                 case FEED:
-                    return buildFeedCollectRuntime(jobSpec, dataSource);
+                    return buildFeedCollectRuntime(jobSpec, (FeedDataSource) dataSource);
                 case INTERNAL_DATASET: {
                     // querying an internal dataset
                     return buildInternalDatasetScan(jobSpec, scanVariables, minFilterVars, maxFilterVars, opSchema,
@@ -349,7 +351,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
                     ExternalDatasetDetails edd = (ExternalDatasetDetails) dataset.getDatasetDetails();
                     IAdapterFactory adapterFactory = getConfiguredAdapterFactory(dataset, edd.getAdapter(),
-                            edd.getProperties(), itemType, false, null);
+                            edd.getProperties(), (ARecordType) itemType, false, null, null);
                     return buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
                             NonTaggedDataFormat.INSTANCE);
                 }
@@ -363,7 +365,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     int pkIndex = 0;
                     IAdapterFactory adapterFactory = getConfiguredAdapterFactory(alds.getTargetDataset(),
                             alds.getAdapter(), alds.getAdapterProperties(), itemType, isPKAutoGenerated,
-                            partitioningKeys);
+                            partitioningKeys, null);
                     RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
                     return buildLoadableDatasetScan(jobSpec, alds, adapterFactory, rDesc, isPKAutoGenerated,
                             partitioningKeys, itemType, pkIndex);
@@ -380,18 +382,27 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
     @SuppressWarnings("rawtypes")
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedCollectRuntime(JobSpecification jobSpec,
-            IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
-
-        FeedDataSource feedDataSource = (FeedDataSource) dataSource;
-        FeedCollectOperatorDescriptor feedCollector = null;
+            FeedDataSource feedDataSource) throws AlgebricksException {
 
         try {
             ARecordType feedOutputType = (ARecordType) feedDataSource.getItemType();
             ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
                     .getSerializerDeserializer(feedOutputType);
-            RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-
-            FeedPolicyEntity feedPolicy = (FeedPolicyEntity) ((AqlDataSource) dataSource).getProperties()
+            IAType metaType = feedDataSource.getMetaItemType();
+            List<IAType> pkTypes = feedDataSource.getPkTypes();
+            ArrayList<ISerializerDeserializer> serdes = new ArrayList<>();
+            serdes.add(payloadSerde);
+            if (metaType != null) {
+                serdes.add(AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType));
+            }
+            if (pkTypes != null) {
+                for (IAType type : pkTypes) {
+                    serdes.add(AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type));
+                }
+            }
+            RecordDescriptor feedDesc = new RecordDescriptor(
+                    serdes.toArray(new ISerializerDeserializer[serdes.size()]));
+            FeedPolicyEntity feedPolicy = (FeedPolicyEntity) feedDataSource.getProperties()
                     .get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
             if (feedPolicy == null) {
                 throw new AlgebricksException("Feed not configured with a policy");
@@ -399,7 +410,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
             FeedConnectionId feedConnectionId = new FeedConnectionId(feedDataSource.getId().getDataverseName(),
                     feedDataSource.getId().getDatasourceName(), feedDataSource.getTargetDataset());
-            feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
+            FeedCollectOperatorDescriptor feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
                     feedDataSource.getSourceFeedId(), feedOutputType, feedDesc, feedPolicy.getProperties(),
                     feedDataSource.getLocation());
 
@@ -542,12 +553,12 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     }
 
     private IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
-            Map<String, String> configuration, IAType itemType, boolean isPKAutoGenerated,
-            List<List<String>> primaryKeys) throws AlgebricksException {
+            Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated,
+            List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException {
         try {
             configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName());
             IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(adapterName, configuration,
-                    (ARecordType) itemType);
+                    itemType, metaType);
 
             // check to see if dataset is indexed
             Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -599,20 +610,22 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
     public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime(
             JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception {
-        Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> factoryOutput = null;
+        Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput = null;
         factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx);
+        ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, primaryFeed.getAdapterConfiguration(),
+                ExternalDataConstants.KEY_TYPE_NAME);
         IAdapterFactory adapterFactory = factoryOutput.first;
         FeedIntakeOperatorDescriptor feedIngestor = null;
         switch (factoryOutput.third) {
             case INTERNAL:
-                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, adapterFactory,
-                        factoryOutput.second, policyAccessor);
+                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, adapterFactory, recordType,
+                        policyAccessor, factoryOutput.second);
                 break;
             case EXTERNAL:
                 String libraryName = primaryFeed.getAdapterName().trim()
                         .split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
                 feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, libraryName,
-                        adapterFactory.getClass().getName(), factoryOutput.second, policyAccessor);
+                        adapterFactory.getClass().getName(), recordType, policyAccessor, factoryOutput.second);
                 break;
         }
 
@@ -633,7 +646,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             boolean temp = dataset.getDatasetDetails().isTemp();
             Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
                     dataset.getDatasetName(), dataset.getDatasetName());
-            if (primaryIndex != null && dataset.getDatasetType() != DatasetType.EXTERNAL) {
+            if (primaryIndex != null && (dataset.getDatasetType() != DatasetType.EXTERNAL)) {
                 isSecondary = !indexName.equals(primaryIndex.getIndexName());
             }
             int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
@@ -1095,8 +1108,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp,
             IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
             List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload)
-                    throws AlgebricksException {
+            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload,
+            List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException {
 
         String datasetName = dataSource.getId().getDatasourceName();
         Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
@@ -1110,7 +1123,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         int numKeys = keys.size();
         int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
         // Move key fields to front.
-        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
+        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
+                + (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())];
         int[] bloomFilterKeyFields = new int[numKeys];
         int i = 0;
         for (LogicalVariable varKey : keys) {
@@ -1119,10 +1133,16 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             bloomFilterKeyFields[i] = i;
             i++;
         }
-        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+        fieldPermutation[i++] = propagatedSchema.findVariable(payload);
         if (numFilterFields > 0) {
             int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
-            fieldPermutation[numKeys + 1] = idx;
+            fieldPermutation[i++] = idx;
+        }
+        if (additionalNonFilteringFields != null) {
+            for (LogicalVariable variable : additionalNonFilteringFields) {
+                int idx = propagatedSchema.findVariable(variable);
+                fieldPermutation[i++] = idx;
+            }
         }
 
         try {
@@ -1196,10 +1216,10 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
             IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
             List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload)
-                    throws AlgebricksException {
+            List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec, boolean bulkload) throws AlgebricksException {
         return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, typeEnv, keys, payload,
-                additionalNonKeyFields, recordDesc, context, spec, bulkload);
+                additionalNonKeyFields, recordDesc, context, spec, bulkload, additionalNonFilteringFields);
     }
 
     @Override
@@ -1208,7 +1228,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
             RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException {
         return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, typeEnv, keys, payload,
-                additionalNonKeyFields, recordDesc, context, spec, false);
+                additionalNonKeyFields, recordDesc, context, spec, false, null);
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteRuntime(
@@ -1503,7 +1523,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(tokenizerOp,
                     splitsAndConstraint.second);
 
-        } catch (MetadataException e) {
+        } catch (Exception e) {
             throw new AlgebricksException(e);
         }
     }
@@ -1671,7 +1691,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                         NoOpOperationCallbackFactory.INSTANCE);
             }
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
-        } catch (MetadataException e) {
+        } catch (Exception e) {
             throw new AlgebricksException(e);
         }
     }
@@ -1766,9 +1786,9 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
             // SecondaryKeys.size() can be two if it comes from the bulkload.
             // In this case, [token, number of token] are the secondaryKeys.
-            if (!isPartitioned || secondaryKeys.size() > 1) {
+            if (!isPartitioned || (secondaryKeys.size() > 1)) {
                 numTokenFields = secondaryKeys.size();
-            } else if (isPartitioned && secondaryKeys.size() == 1) {
+            } else if (isPartitioned && (secondaryKeys.size() == 1)) {
                 numTokenFields = secondaryKeys.size() + 1;
             }
 
@@ -1881,7 +1901,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                         indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName);
             }
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
-        } catch (MetadataException e) {
+        } catch (Exception e) {
             throw new AlgebricksException(e);
         }
     }
@@ -2193,7 +2213,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             // Create the adapter factory <- right now there is only one. if there are more in the future, we can create
             // a map->
             ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
-            LookupAdapterFactory<?> adapterFactory = AdapterFactoryProvider.getAdapterFactory(
+            LookupAdapterFactory<?> adapterFactory = AdapterFactoryProvider.getLookupAdapterFactory(
                     datasetDetails.getProperties(), (ARecordType) itemType, ridIndexes, retainInput, retainNull,
                     context.getNullWriterFactory());
 
@@ -2236,13 +2256,12 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         }
     }
 
-    //TODO: refactor this method
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(
             IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
             List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys,
-            LogicalVariable prevPayload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-                    throws AlgebricksException {
+            List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
         String datasetName = dataSource.getId().getDatasourceName();
         Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
         if (dataset == null) {
@@ -2254,8 +2273,9 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
         int numKeys = primaryKeys.size();
         int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+        int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size();
         // Move key fields to front. {keys, record, filters}
-        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
+        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields];
         int[] bloomFilterKeyFields = new int[numKeys];
         int i = 0;
         // set the keys' permutations
@@ -2266,11 +2286,18 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             i++;
         }
         // set the record permutation
-        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+        fieldPermutation[i++] = propagatedSchema.findVariable(payload);
         // set the filters' permutations.
         if (numFilterFields > 0) {
             int idx = propagatedSchema.findVariable(filterKeys.get(0));
-            fieldPermutation[numKeys + 1] = idx;
+            fieldPermutation[i++] = idx;
+        }
+
+        if (additionalNonFilterFields != null) {
+            for (LogicalVariable var : additionalNonFilterFields) {
+                int idx = propagatedSchema.findVariable(var);
+                fieldPermutation[i++] = idx;
+            }
         }
 
         try {
@@ -2534,9 +2561,9 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
             // SecondaryKeys.size() can be two if it comes from the bulkload.
             // In this case, [token, number of token] are the secondaryKeys.
-            if (!isPartitioned || secondaryKeys.size() > 1) {
+            if (!isPartitioned || (secondaryKeys.size() > 1)) {
                 numTokenFields = secondaryKeys.size();
-            } else if (isPartitioned && secondaryKeys.size() == 1) {
+            } else if (isPartitioned && (secondaryKeys.size() == 1)) {
                 numTokenFields = secondaryKeys.size() + 1;
             }
 
@@ -2641,7 +2668,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName, prevFieldPermutation);
 
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
-        } catch (MetadataException e) {
+        } catch (Exception e) {
             throw new AlgebricksException(e);
         }
     }
@@ -2944,7 +2971,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     idfh, filterFactory, false, indexName, null, modificationCallbackFactory,
                     NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
-        } catch (MetadataException e) {
+        } catch (Exception e) {
             throw new AlgebricksException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 46e3007..21e5729 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -18,57 +18,46 @@
  */
 package org.apache.asterix.metadata.declared;
 
+import java.util.List;
+
 import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
 import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 
 public class FeedDataSource extends AqlDataSource {
 
-    private Feed feed;
+    private final Feed feed;
     private final FeedId sourceFeedId;
     private final IFeed.FeedType sourceFeedType;
     private final ConnectionLocation location;
     private final String targetDataset;
     private final String[] locations;
     private final int computeCardinality;
+    private final List<IAType> pkTypes;
+    private final List<ScalarFunctionCallExpression> keyAccessExpression;
 
-    public FeedDataSource(AqlSourceId id, String targetDataset, IAType itemType, IAType metaItemType,
-            AqlDataSourceType dataSourceType, FeedId sourceFeedId, IFeed.FeedType sourceFeedType,
+    public FeedDataSource(Feed feed, AqlSourceId id, String targetDataset, IAType itemType, IAType metaType,
+            List<IAType> pkTypes, List<List<String>> partitioningKeys,
+            List<ScalarFunctionCallExpression> keyAccessExpression, FeedId sourceFeedId, IFeed.FeedType sourceFeedType,
             ConnectionLocation location, String[] locations) throws AlgebricksException {
-        super(id, itemType, metaItemType, dataSourceType);
+        super(id, itemType, metaType, AqlDataSourceType.FEED);
+        this.feed = feed;
         this.targetDataset = targetDataset;
         this.sourceFeedId = sourceFeedId;
         this.sourceFeedType = sourceFeedType;
         this.location = location;
         this.locations = locations;
+        this.pkTypes = pkTypes;
+        this.keyAccessExpression = keyAccessExpression;
         this.computeCardinality = AsterixClusterProperties.INSTANCE.getParticipantNodes().size();
-        MetadataTransactionContext ctx = null;
-        try {
-            MetadataManager.INSTANCE.acquireReadLatch();
-            ctx = MetadataManager.INSTANCE.beginTransaction();
-            this.feed = MetadataManager.INSTANCE.getFeed(ctx, id.getDataverseName(), id.getDatasourceName());
-            MetadataManager.INSTANCE.commitTransaction(ctx);
-            initFeedDataSource(itemType);
-        } catch (Exception e) {
-            if (ctx != null) {
-                try {
-                    MetadataManager.INSTANCE.abortTransaction(ctx);
-                } catch (Exception e2) {
-                    e2.addSuppressed(e);
-                    throw new IllegalStateException("Unable to abort " + e2.getMessage());
-                }
-            }
-
-        } finally {
-            MetadataManager.INSTANCE.releaseReadLatch();
-        }
+        initFeedDataSource();
     }
 
     public Feed getFeed() {
@@ -96,9 +85,19 @@ public class FeedDataSource extends AqlDataSource {
         return locations;
     }
 
-    private void initFeedDataSource(IAType itemType) {
-        schemaTypes = new IAType[1];
-        schemaTypes[0] = itemType;
+    private void initFeedDataSource() {
+        int i = 0;
+        // record + meta (if exists) + PKs (if exists)
+        schemaTypes = new IAType[(1 + (metaItemType != null ? 1 : 0) + (pkTypes != null ? pkTypes.size() : 0))];
+        schemaTypes[i++] = itemType;
+        if (metaItemType != null) {
+            schemaTypes[i++] = metaItemType;
+        }
+        if (pkTypes != null) {
+            for (IAType type : pkTypes) {
+                schemaTypes[i++] = type;
+            }
+        }
         INodeDomain domainForExternalData = new INodeDomain() {
             @Override
             public Integer cardinality() {
@@ -120,4 +119,37 @@ public class FeedDataSource extends AqlDataSource {
     public int getComputeCardinality() {
         return computeCardinality;
     }
+
+    public List<IAType> getPkTypes() {
+        return pkTypes;
+    }
+
+    public List<ScalarFunctionCallExpression> getKeyAccessExpression() {
+        return keyAccessExpression;
+    }
+
+    @Override
+    public LogicalVariable getMetaVariable(List<LogicalVariable> dataScanVariables) {
+        return metaItemType == null ? null : dataScanVariables.get(1);
+    }
+
+    @Override
+    public LogicalVariable getDataRecordVariable(List<LogicalVariable> dataScanVariables) {
+        return dataScanVariables.get(0);
+    }
+
+    public boolean isChange() {
+        return pkTypes != null;
+    }
+
+    public List<LogicalVariable> getPkVars(List<LogicalVariable> allVars) {
+        if (pkTypes == null) {
+            return null;
+        }
+        if (metaItemType != null) {
+            return allVars.subList(2, allVars.size());
+        } else {
+            return allVars.subList(1, allVars.size());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 1f815c0..78c6587 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.metadata.feeds;
 
+import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -32,6 +33,7 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.MetadataConstants;
 import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.api.IAdapterFactory;
@@ -45,12 +47,13 @@ import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
 import org.apache.asterix.external.provider.AdapterFactoryProvider;
-import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Datatype;
@@ -79,7 +82,9 @@ import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
@@ -92,7 +97,7 @@ import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageCon
  */
 public class FeedMetadataUtil {
 
-    private static Logger LOGGER = Logger.getLogger(FeedMetadataUtil.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(FeedMetadataUtil.class.getName());
 
     private static class LocationConstraint {
         int partition;
@@ -158,7 +163,7 @@ public class FeedMetadataUtil {
                         orig.getFeedConnectionId(), orig.getSourceFeedId(), (ARecordType) orig.getOutputType(),
                         orig.getRecordDescriptor(), orig.getFeedPolicyProperties(), orig.getSubscriptionLocation());
                 oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
-            } else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor
+            } else if ((opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor)
                     && ((AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) {
                 // only introduce store before primary index
                 operandId = ((AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
@@ -343,7 +348,7 @@ public class FeedMetadataUtil {
             sourceOp = entry.getValue().getKey().getKey();
             if (sourceOp instanceof FeedCollectOperatorDescriptor) {
                 targetOp = entry.getValue().getValue().getKey();
-                if (targetOp instanceof FeedMetaOperatorDescriptor
+                if ((targetOp instanceof FeedMetaOperatorDescriptor)
                         && (((FeedMetaOperatorDescriptor) targetOp).getRuntimeType().equals(FeedRuntimeType.COMPUTE))) {
                     connDesc = connectors.get(cid);
                     break;
@@ -458,7 +463,8 @@ public class FeedMetadataUtil {
         return preProcessingRequired;
     }
 
-    public static Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> getPrimaryFeedFactoryAndOutput(
+    @SuppressWarnings("rawtypes")
+    public static Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> getPrimaryFeedFactoryAndOutput(
             Feed feed, FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx)
                     throws AlgebricksException {
         // This method needs to be re-visited
@@ -467,13 +473,15 @@ public class FeedMetadataUtil {
         String adapterFactoryClassname = null;
         IAdapterFactory adapterFactory = null;
         ARecordType adapterOutputType = null;
-        Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> feedProps = null;
+        ARecordType metaType = null;
+        Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> feedProps = null;
         IDataSourceAdapter.AdapterType adapterType = null;
         try {
             adapterName = feed.getAdapterName();
             Map<String, String> configuration = feed.getAdapterConfiguration();
             configuration.putAll(policyAccessor.getFeedPolicy());
-            adapterOutputType = getOutputType(feed, configuration);
+            adapterOutputType = getOutputType(feed, configuration, ExternalDataConstants.KEY_TYPE_NAME);
+            metaType = getOutputType(feed, configuration, ExternalDataConstants.KEY_META_TYPE_NAME);
             ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName());
             // Get adapter from metadata dataset <Metadata dataverse>
             adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
@@ -482,8 +490,6 @@ public class FeedMetadataUtil {
             if (adapterEntity == null) {
                 adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
             }
-
-            ExternalDataCompatibilityUtils.addCompatabilityParameters(adapterName, adapterOutputType, configuration);
             if (adapterEntity != null) {
                 adapterType = adapterEntity.getType();
                 adapterFactoryClassname = adapterEntity.getClassname();
@@ -499,26 +505,64 @@ public class FeedMetadataUtil {
                         adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
                         break;
                 }
-                adapterFactory.configure(configuration, adapterOutputType);
+                adapterFactory.configure(configuration, adapterOutputType, metaType);
             } else {
-                adapterFactory = AdapterFactoryProvider.getAdapterFactory(adapterName, configuration,
-                        adapterOutputType);
+                adapterFactory = AdapterFactoryProvider.getAdapterFactory(adapterName, configuration, adapterOutputType,
+                        metaType);
                 adapterType = IDataSourceAdapter.AdapterType.INTERNAL;
             }
-            feedProps = new Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType>(adapterFactory,
-                    adapterOutputType, adapterType);
+            int numOfOutputs = 1;
+            if (metaType != null) {
+                numOfOutputs++;
+            }
+            if (ExternalDataUtils.isChangeFeed(configuration)) {
+                // get number of PKs
+                numOfOutputs += ExternalDataUtils.getNumberOfKeys(configuration);
+            }
+            ISerializerDeserializer[] serdes = new ISerializerDeserializer[numOfOutputs];
+            int i = 0;
+            serdes[i++] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(adapterOutputType);
+            if (metaType != null) {
+                serdes[i++] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
+            }
+            if (ExternalDataUtils.isChangeFeed(configuration)) {
+                int[] pkIndexes = ExternalDataUtils.getPKIndexes(configuration);
+                if (metaType != null) {
+                    int[] pkIndicators = ExternalDataUtils.getPKSourceIndicators(configuration);
+                    for (int j = 0; j < pkIndexes.length; j++) {
+                        int aInt = pkIndexes[j];
+                        if (pkIndicators[j] == 0) {
+                            serdes[i++] = AqlSerializerDeserializerProvider.INSTANCE
+                                    .getSerializerDeserializer(adapterOutputType.getFieldTypes()[aInt]);
+                        } else if (pkIndicators[j] == 1) {
+                            serdes[i++] = AqlSerializerDeserializerProvider.INSTANCE
+                                    .getSerializerDeserializer(metaType.getFieldTypes()[aInt]);
+                        } else {
+                            throw new AlgebricksException("a key source indicator can only be 0 or 1");
+                        }
+                    }
+                } else {
+                    for (int aInt : pkIndexes) {
+                        serdes[i++] = AqlSerializerDeserializerProvider.INSTANCE
+                                .getSerializerDeserializer(adapterOutputType.getFieldTypes()[aInt]);
+                    }
+                }
+            }
+            feedProps = new Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType>(adapterFactory,
+                    new RecordDescriptor(serdes), adapterType);
         } catch (Exception e) {
             throw new AlgebricksException("unable to create adapter", e);
         }
         return feedProps;
     }
 
-    public static ARecordType getOutputType(IFeed feed, Map<String, String> configuration) throws Exception {
+    public static ARecordType getOutputType(IFeed feed, Map<String, String> configuration, String key)
+            throws RemoteException, ACIDException, MetadataException {
         ARecordType outputType = null;
-        String fqOutputType = configuration.get(ExternalDataConstants.KEY_TYPE_NAME);
+        String fqOutputType = configuration.get(key);
 
         if (fqOutputType == null) {
-            throw new IllegalArgumentException("No output type specified");
+            return null;
         }
         String[] dataverseAndType = fqOutputType.split("[.]");
         String dataverseName;
@@ -530,9 +574,9 @@ public class FeedMetadataUtil {
         } else if (dataverseAndType.length == 2) {
             dataverseName = dataverseAndType[0];
             datatypeName = dataverseAndType[1];
-        } else
-            throw new IllegalArgumentException(
-                    "Invalid value for the parameter " + ExternalDataConstants.KEY_TYPE_NAME);
+        } else {
+            throw new IllegalArgumentException("Invalid value for the parameter " + key);
+        }
 
         MetadataTransactionContext ctx = null;
         MetadataManager.INSTANCE.acquireReadLatch();
@@ -545,11 +589,15 @@ public class FeedMetadataUtil {
             }
             outputType = (ARecordType) t.getDatatype();
             MetadataManager.INSTANCE.commitTransaction(ctx);
-        } catch (Exception e) {
+        } catch (ACIDException | RemoteException | MetadataException e) {
             if (ctx != null) {
-                MetadataManager.INSTANCE.abortTransaction(ctx);
+                try {
+                    MetadataManager.INSTANCE.abortTransaction(ctx);
+                } catch (ACIDException | RemoteException e2) {
+                    e.addSuppressed(e2);
+                }
+                throw e;
             }
-            throw e;
         } finally {
             MetadataManager.INSTANCE.releaseReadLatch();
         }
@@ -557,15 +605,15 @@ public class FeedMetadataUtil {
     }
 
     public static String getSecondaryFeedOutput(Feed feed, FeedPolicyAccessor policyAccessor,
-            MetadataTransactionContext mdTxnCtx) throws AlgebricksException, MetadataException {
+            MetadataTransactionContext mdTxnCtx)
+                    throws AlgebricksException, MetadataException, RemoteException, ACIDException {
         String outputType = null;
         String primaryFeedName = feed.getSourceFeedName();
         Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feed.getDataverseName(), primaryFeedName);
         FunctionSignature appliedFunction = primaryFeed.getAppliedFunction();
         if (appliedFunction == null) {
-            Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> result = getPrimaryFeedFactoryAndOutput(
-                    primaryFeed, policyAccessor, mdTxnCtx);
-            outputType = result.second.getTypeName();
+            outputType = getOutputType(feed, feed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME)
+                    .getDisplayName();
         } else {
             Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
             if (function != null) {
@@ -583,4 +631,10 @@ public class FeedMetadataUtil {
         return outputType;
     }
 
+    public static boolean isChangeFeed(AqlMetadataProvider mdProvider, String dataverse, String feedName)
+            throws AlgebricksException {
+        Feed feed = mdProvider.findFeed(dataverse, feedName);
+        return ExternalDataUtils.isChangeFeed(feed.getAdapterConfiguration());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
index 4e8c34c..581d01c 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
@@ -107,15 +107,37 @@ public class DatasetUtils {
 
     public static ITypeTraits[] computeTupleTypeTraits(Dataset dataset, ARecordType itemType)
             throws AlgebricksException {
+        return computeTupleTypeTraits(dataset, itemType, null);
+    }
+
+    public static ITypeTraits[] computeTupleTypeTraits(Dataset dataset, ARecordType itemType, ARecordType metaItemType)
+            throws AlgebricksException {
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             throw new AlgebricksException("not implemented");
         }
         List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
         int numKeys = partitioningKeys.size();
-        ITypeTraits[] typeTraits = new ITypeTraits[numKeys + 1];
-        for (int i = 0; i < numKeys; i++) {
-            IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i));
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+        ITypeTraits[] typeTraits;
+        if (metaItemType != null) {
+            typeTraits = new ITypeTraits[numKeys + 2];
+            List<Integer> indicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
+            typeTraits[numKeys + 1] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(metaItemType);
+            for (int i = 0; i < numKeys; i++) {
+                IAType keyType;
+                if (indicator.get(i) == 0) {
+                    keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+                } else {
+                    keyType = metaItemType.getSubFieldType(partitioningKeys.get(i));
+                }
+                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            }
+        } else {
+            typeTraits = new ITypeTraits[numKeys + 1];
+            for (int i = 0; i < numKeys; i++) {
+                IAType keyType;
+                keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            }
         }
         typeTraits[numKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
         return typeTraits;
@@ -202,7 +224,7 @@ public class DatasetUtils {
     public static int getPositionOfPartitioningKeyField(Dataset dataset, String fieldExpr) {
         List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
         for (int i = 0; i < partitioningKeys.size(); i++) {
-            if (partitioningKeys.get(i).size() == 1 && partitioningKeys.get(i).get(0).equals(fieldExpr)) {
+            if ((partitioningKeys.get(i).size() == 1) && partitioningKeys.get(i).get(0).equals(fieldExpr)) {
                 return i;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java
index 9f8d5d7..d604f35 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -726,6 +726,8 @@ public class AsterixBuiltinFunctions {
 
     public static final FunctionIdentifier META = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta",
             FunctionIdentifier.VARARGS);
+    public static final FunctionIdentifier META_KEY = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta-key",
+            FunctionIdentifier.VARARGS);
 
     public static IFunctionInfo getAsterixFunctionInfo(FunctionIdentifier fid) {
         return registeredFunctions.get(fid);
@@ -1034,6 +1036,7 @@ public class AsterixBuiltinFunctions {
 
         // meta() function
         addFunction(META, OptionalOpenARecordTypeComputer.INSTANCE, true);
+        addPrivateFunction(META_KEY, AnyTypeComputer.INSTANCE, false);
 
         addPrivateFunction(COLLECTION_TO_SEQUENCE, CollectionToSequenceTypeComputer.INSTANCE, true);
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java b/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
index 91a67ba..6147276 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
@@ -30,6 +30,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.om.util.NonTaggedFormatUtil;
 import org.apache.asterix.om.visitors.IOMVisitor;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
@@ -42,6 +43,9 @@ import org.json.JSONObject;
  */
 public class ARecordType extends AbstractComplexType {
 
+    public static final ARecordType FULLY_OPEN_RECORD_TYPE = new ARecordType("OpenRecord", new String[0], new IAType[0],
+            true);
+
     private static final long serialVersionUID = 1L;
     private final String[] fieldNames;
     private final IAType[] fieldTypes;
@@ -71,11 +75,11 @@ public class ARecordType extends AbstractComplexType {
         }
     }
 
-    public final String[] getFieldNames() {
+    public String[] getFieldNames() {
         return fieldNames;
     }
 
-    public final IAType[] getFieldTypes() {
+    public IAType[] getFieldTypes() {
         return fieldTypes;
     }
 
@@ -96,7 +100,7 @@ public class ARecordType extends AbstractComplexType {
         int n = fieldNames.length;
         for (int i = 0; i < n; i++) {
             sb.append("  " + fieldNames[i] + ": " + fieldTypes[i].toString());
-            if (i < n - 1) {
+            if (i < (n - 1)) {
                 sb.append(",\n");
             } else {
                 sb.append("\n");
@@ -144,7 +148,7 @@ public class ARecordType extends AbstractComplexType {
 
     public IAType getSubFieldType(List<String> subFieldName, IAType parent) {
         ARecordType subRecordType = (ARecordType) parent;
-        for (int i = 0; i < subFieldName.size() - 1; i++) {
+        for (int i = 0; i < (subFieldName.size() - 1); i++) {
             subRecordType = (ARecordType) subRecordType.getFieldType(subFieldName.get(i));
         }
         return subRecordType.getFieldType(subFieldName.get(subFieldName.size() - 1));
@@ -182,10 +186,11 @@ public class ARecordType extends AbstractComplexType {
      * @param fieldName
      *            the fieldName whose type is sought
      * @return the field type of the field name if it exists, otherwise null
+     *         NOTE: this method doesn't work for nested fields
      */
     public IAType getFieldType(String fieldName) {
         int fieldPos = getFieldIndex(fieldName);
-        if (fieldPos < 0 || fieldPos >= fieldTypes.length) {
+        if ((fieldPos < 0) || (fieldPos >= fieldTypes.length)) {
             return null;
         }
         return fieldTypes[fieldPos];
@@ -242,7 +247,7 @@ public class ARecordType extends AbstractComplexType {
     public void generateNestedDerivedTypeNames() {
         for (int i = 0; i < fieldTypes.length; i++) {
             IAType fieldType = fieldTypes[i];
-            if (fieldType.getTypeTag().isDerivedType() && fieldType.getTypeName() == null) {
+            if (fieldType.getTypeTag().isDerivedType() && (fieldType.getTypeName() == null)) {
                 AbstractComplexType nestedType = ((AbstractComplexType) fieldType);
                 nestedType.setTypeName(getTypeName() + "_" + fieldNames[i]);
                 nestedType.generateNestedDerivedTypeNames();
@@ -256,7 +261,7 @@ public class ARecordType extends AbstractComplexType {
             return false;
         }
         ARecordType rt = (ARecordType) obj;
-        return isOpen == rt.isOpen && Arrays.deepEquals(fieldNames, rt.fieldNames)
+        return (isOpen == rt.isOpen) && Arrays.deepEquals(fieldNames, rt.fieldNames)
                 && Arrays.deepEquals(fieldTypes, rt.fieldTypes);
     }
 
@@ -264,10 +269,10 @@ public class ARecordType extends AbstractComplexType {
     public int hash() {
         int h = 0;
         for (int i = 0; i < fieldNames.length; i++) {
-            h += 31 * h + fieldNames[i].hashCode();
+            h += (31 * h) + fieldNames[i].hashCode();
         }
         for (int i = 0; i < fieldTypes.length; i++) {
-            h += 31 * h + fieldTypes[i].hashCode();
+            h += (31 * h) + fieldTypes[i].hashCode();
         }
         return h;
     }
@@ -298,4 +303,11 @@ public class ARecordType extends AbstractComplexType {
         return NonTaggedFormatUtil.hasNullableField(rt) ? (int) Math.ceil(rt.getFieldNames().length / 8.0) : 0;
     }
 
+    public List<IAType> getFieldTypes(List<List<String>> fields) throws AlgebricksException {
+        List<IAType> typeList = new ArrayList<>();
+        for (List<String> field : fields) {
+            typeList.add(getSubFieldType(field));
+        }
+        return typeList;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 97683d5..83c6e34 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.INullWriter;
@@ -44,29 +45,30 @@ import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
-import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 
 public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
-    private PermutingFrameTupleReference key;
+    private final PermutingFrameTupleReference key;
     private MultiComparator keySearchCmp;
     private ArrayTupleBuilder nullTupleBuilder;
-    private INullWriter nullWriter;
+    private final INullWriter nullWriter;
     private ArrayTupleBuilder tb;
     private DataOutput dos;
-    private LSMBTree lsmIndex;
     private RangePredicate searchPred;
     private IIndexCursor cursor;
     private ITupleReference prevTuple;
-    private int numOfPrimaryKeys;
+    private final int numOfPrimaryKeys;
     boolean isFiltered = false;
-    private ArrayTupleReference prevTupleWithFilter = new ArrayTupleReference();
+    private final ArrayTupleReference prevTupleWithFilter = new ArrayTupleReference();
     private ArrayTupleBuilder prevRecWithPKWithFilterValue;
     private ARecordType recordType;
     private int presetFieldIndex = -1;
@@ -87,7 +89,7 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
         }
         key.setFieldPermutation(searchKeyPermutations);
         this.numOfPrimaryKeys = numOfPrimaryKeys;
-        if (fieldPermutation.length > numOfPrimaryKeys + 1) {
+        if (filterFieldIndex >= 0) {
             isFiltered = true;
             this.recordType = recordType;
             this.presetFieldIndex = filterFieldIndex;
@@ -109,7 +111,7 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
         writeBuffer = new VSizeFrame(ctx);
         writer.open();
         indexHelper.open();
-        lsmIndex = (LSMBTree) indexHelper.getIndexInstance();
+        index = indexHelper.getIndexInstance();
 
         try {
             nullTupleBuilder = new ArrayTupleBuilder(1);
@@ -126,15 +128,16 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
             appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
                     indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
-                    lsmIndex, ctx);
+                    index, ctx);
 
-            indexAccessor = lsmIndex.createAccessor(modCallback, opDesc.getSearchOpCallbackFactory()
+            indexAccessor = index.createAccessor(modCallback, opDesc.getSearchOpCallbackFactory()
                     .createSearchOperationCallback(indexHelper.getResourceID(), ctx));
-            cursor = createCursor();
+            cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
             IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                     .getApplicationContext().getApplicationObject();
-            AsterixLSMIndexUtil.checkAndSetFirstLSN(lsmIndex, runtimeCtx.getTransactionSubsystem().getLogManager());
+            AsterixLSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
+                    runtimeCtx.getTransactionSubsystem().getLogManager());
         } catch (Exception e) {
             indexHelper.close();
             throw new HyracksDataException(e);
@@ -143,16 +146,18 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
 
     private void resetSearchPredicate(int tupleIndex) {
         key.reset(accessor, tupleIndex);
+        searchPred.reset(key, key, true, true, keySearchCmp, keySearchCmp);
     }
 
-    protected void writeOutput(int tupleIndex) throws Exception {
+    private void writeOutput(int tupleIndex, boolean recordWasInserted) throws IOException {
+        boolean recordWasDeleted = prevTuple != null;
         tb.reset();
         frameTuple.reset(accessor, tupleIndex);
         for (int i = 0; i < frameTuple.getFieldCount(); i++) {
             dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
             tb.addFieldEndOffset();
         }
-        if (prevTuple != null) {
+        if (recordWasDeleted) {
             dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
                     prevTuple.getFieldLength(numOfPrimaryKeys));
             tb.addFieldEndOffset();
@@ -169,7 +174,13 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
                 addNullField();
             }
         }
-        FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        if (recordWasInserted || recordWasDeleted) {
+            FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        }
+    }
+
+    public static boolean isNull(ITupleReference t1, int field) {
+        return t1.getFieldData(field)[t1.getFieldStart(field)] == ATypeTag.SERIALIZED_NULL_TYPE_TAG;
     }
 
     private void addNullField() throws IOException {
@@ -183,12 +194,12 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
         accessor.reset(buffer);
         LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
         int tupleCount = accessor.getTupleCount();
-
+        int i = 0;
         try {
-            for (int i = 0; i < tupleCount; i++) {
+            while (i < tupleCount) {
+                boolean recordWasInserted = false;
                 tuple.reset(accessor, i);
                 resetSearchPredicate(i);
-                cursor.reset();
                 lsmAccessor.search(cursor, searchPred);
                 if (cursor.hasNext()) {
                     cursor.next();
@@ -205,21 +216,25 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
                     }
                 } else {
                     prevTuple = null;
+                    cursor.reset();
                 }
-                modCallback.setOp(Operation.INSERT);
-                if (prevTuple == null && i == 0) {
-                    lsmAccessor.insert(tuple);
-                } else {
-                    lsmAccessor.forceInsert(tuple);
+                if (!isNull(tuple, numOfPrimaryKeys)) {
+                    modCallback.setOp(Operation.INSERT);
+                    if ((prevTuple == null) && (i == 0)) {
+                        lsmAccessor.insert(tuple);
+                    } else {
+                        lsmAccessor.forceInsert(tuple);
+                    }
+                    recordWasInserted = true;
                 }
-                writeOutput(i);
+                writeOutput(i, recordWasInserted);
+                i++;
             }
             if (tupleCount > 0) {
                 // All tuples has to move forward to maintain the correctness of the transaction pipeline
                 appender.write(writer, true);
             }
-        } catch (Exception e) {
-            e.printStackTrace();
+        } catch (IndexException | IOException | AsterixException e) {
             throw new HyracksDataException(e);
         }
     }
@@ -233,7 +248,8 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
         recPointable.set(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
                 prevTuple.getFieldLength(numOfPrimaryKeys));
         // copy the field data from prevTuple
-        prevDos.write(recPointable.getClosedFieldType(recordType, presetFieldIndex).getTypeTag().serialize());
+        byte tag = recPointable.getClosedFieldType(recordType, presetFieldIndex).getTypeTag().serialize();
+        prevDos.write(tag);
         prevDos.write(recPointable.getByteArray(), recPointable.getClosedFieldOffset(recordType, presetFieldIndex),
                 recPointable.getClosedFieldSize(recordType, presetFieldIndex));
         prevRecWithPKWithFilterValue.addFieldEndOffset();
@@ -244,14 +260,10 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
     }
 
     private RangePredicate createSearchPredicate() {
-        keySearchCmp = BTreeUtils.getSearchMultiComparator(lsmIndex.getComparatorFactories(), key);
+        keySearchCmp = BTreeUtils.getSearchMultiComparator(((ITreeIndex) index).getComparatorFactories(), key);
         return new RangePredicate(key, key, true, true, keySearchCmp, keySearchCmp, null, null);
     }
 
-    protected IIndexCursor createCursor() {
-        return indexAccessor.createSearchCursor(false);
-    }
-
     @Override
     public void close() throws HyracksDataException {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
index 65dc83f..05b633d 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
@@ -20,7 +20,6 @@ package org.apache.asterix.runtime.operators;
 
 import java.nio.ByteBuffer;
 
-import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -88,10 +87,6 @@ public class AsterixLSMSecondaryUpsertOperatorNodePushable extends LSMIndexInser
         return true;
     }
 
-    private boolean isNull(PermutingFrameTupleReference t1) {
-        return t1.getFieldData(0)[t1.getFieldStart(0)] == ATypeTag.SERIALIZED_NULL_TYPE_TAG;
-    }
-
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
@@ -102,8 +97,8 @@ public class AsterixLSMSecondaryUpsertOperatorNodePushable extends LSMIndexInser
                 // if both previous value and new value are null, then we skip
                 tuple.reset(accessor, i);
                 prevValueTuple.reset(accessor, i);
-                isNewNull = isNull(tuple);
-                isPrevValueNull = isNull(prevValueTuple);
+                isNewNull = AsterixLSMPrimaryUpsertOperatorNodePushable.isNull(tuple, 0);
+                isPrevValueNull = AsterixLSMPrimaryUpsertOperatorNodePushable.isNull(prevValueTuple, 0);
                 if (isNewNull && isPrevValueNull) {
                     continue;
                 }


Mime
View raw message