asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [3/3] incubator-asterixdb-hyracks git commit: Support Change Feeds and Ingestion of Records with MetaData
Date Tue, 15 Mar 2016 23:34:56 GMT
Support Change Feeds and Ingestion of Records with MetaData

This change allows feeds to perform upserts and deletes
in order to perform replication of an external data source.
The change does so by performing the following:
1. The adapter produces [PK][Record]. (Record == null --> delete)
2. The insert is replaced by an upsert operator.
Change-Id: I3749349e2b9f1b03c8b310eb99d3f44d08be77df
Reviewed-on: https://asterix-gerrit.ics.uci.edu/620
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/7dabc19f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/7dabc19f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/7dabc19f

Branch: refs/heads/master
Commit: 7dabc19f75e45f0d1a577b304abd07b1301dab54
Parents: 3f2f539
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Wed Mar 16 01:58:46 2016 +0300
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Tue Mar 15 16:28:42 2016 -0700

----------------------------------------------------------------------
 .../AbstractFunctionCallExpression.java         |   17 +-
 .../algebra/metadata/IMetadataProvider.java     |   10 +-
 .../operators/logical/AssignOperator.java       |   22 +-
 .../logical/InsertDeleteUpsertOperator.java     |   26 +-
 .../operators/logical/ProjectOperator.java      |    5 +-
 .../operators/logical/UnnestOperator.java       |    4 +-
 .../logical/visitors/UsedVariableVisitor.java   |    6 +
 .../operators/physical/BulkloadPOperator.java   |   20 +-
 .../physical/InsertDeleteUpsertPOperator.java   |   16 +-
 .../core/algebra/plan/ALogicalPlanImpl.java     |   25 +-
 .../LogicalOperatorPrettyPrintVisitor.java      |   10 +-
 .../piglet/metadata/PigletMetadataProvider.java |   13 +-
 .../rules/ExtractCommonExpressionsRule.java     |   13 +-
 .../SetAlgebricksPhysicalOperatorsRule.java     |   16 +-
 .../api/dataflow/value/RecordDescriptor.java    |    2 +-
 .../common/comm/io/ArrayTupleBuilder.java       |   11 +-
 .../common/comm/io/FrameTupleAccessor.java      |   63 +-
 .../hyracks/dataflow/std/file/FileSplit.java    |    3 +-
 .../hyracks/dataflow/std/file/CursorTest.java   |  108 +
 .../src/test/resources/data/beer.txt            | 7308 ++++++++++++++++++
 .../IndexBulkLoadOperatorNodePushable.java      |    5 +-
 ...xInsertUpdateDeleteOperatorNodePushable.java |    2 +-
 .../TreeIndexCreateOperatorDescriptor.java      |    4 +-
 .../am/common/tuples/TypeAwareTupleWriter.java  |   10 +-
 .../am/lsm/btree/util/LSMBTreeUtils.java        |    3 +-
 .../impls/AbstractMemoryLSMComponent.java       |    4 +-
 .../lsm/common/impls/LSMIndexSearchCursor.java  |   15 +-
 pom.xml                                         |    3 +-
 28 files changed, 7635 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java
index 7f88e2f..e357dea 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java
@@ -27,7 +27,6 @@ import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
@@ -110,6 +109,7 @@ public abstract class AbstractFunctionCallExpression extends AbstractLogicalExpr
         return arguments;
     }
 
+    @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("function-call: " + finfo.getFunctionIdentifier() + ", Args:[");
@@ -172,7 +172,8 @@ public abstract class AbstractFunctionCallExpression extends AbstractLogicalExpr
     }
 
     @Override
-    public void getConstraintsForOuterJoin(Collection<FunctionalDependency> fds, Collection<LogicalVariable> outerVars) {
+    public void getConstraintsForOuterJoin(Collection<FunctionalDependency> fds,
+            Collection<LogicalVariable> outerVars) {
         FunctionIdentifier funId = getFunctionIdentifier();
         if (funId.equals(AlgebricksBuiltinFunctions.AND)) {
             for (Mutable<ILogicalExpression> a : arguments) {
@@ -202,22 +203,26 @@ public abstract class AbstractFunctionCallExpression extends AbstractLogicalExpr
         } else {
             AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) obj;
             boolean equal = getFunctionIdentifier().equals(fce.getFunctionIdentifier());
-            if (!equal)
+            if (!equal) {
                 return false;
+            }
             for (int i = 0; i < arguments.size(); i++) {
                 ILogicalExpression argument = arguments.get(i).getValue();
                 ILogicalExpression fceArgument = fce.getArguments().get(i).getValue();
-                if (!argument.equals(fceArgument))
+                if (!argument.equals(fceArgument)) {
                     return false;
+                }
             }
             if (opaqueParameters != null) {
-                if (opaqueParameters.length != fce.opaqueParameters.length)
+                if (opaqueParameters.length != fce.opaqueParameters.length) {
                     return false;
+                }
                 for (int i = 0; i < opaqueParameters.length; i++) {
                     Object opaqueParameter = opaqueParameters[i];
                     Object fceOpaqueParameter = fce.opaqueParameters[i];
-                    if (!opaqueParameter.equals(fceOpaqueParameter))
+                    if (!opaqueParameter.equals(fceOpaqueParameter)) {
                         return false;
+                    }
                 }
             }
             return true;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 1ad7fe6..8466ef9 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -67,8 +67,9 @@ public interface IMetadataProvider<S, I> {
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
-            LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification jobSpec, boolean bulkload) throws AlgebricksException;
+            LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterKeyFields,
+            List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification jobSpec, boolean bulkload) throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
@@ -195,8 +196,9 @@ public interface IMetadataProvider<S, I> {
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
-            LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, LogicalVariable prevPayload,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
+            LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterFields,
+            List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification jobSpec) throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
             IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
index aebbba8..f491028 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
@@ -19,8 +19,8 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;
 
 import java.util.List;
-import org.apache.commons.lang3.mutable.Mutable;
 
+import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
@@ -42,14 +42,13 @@ import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisit
 
 public class AssignOperator extends AbstractAssignOperator {
 
-	private LocalOrderProperty explicitOrderingProperty;
+    private LocalOrderProperty explicitOrderingProperty;
 
     public AssignOperator(List<LogicalVariable> vars, List<Mutable<ILogicalExpression>> exprs) {
         super(vars, exprs);
     }
 
     public AssignOperator(LogicalVariable var, Mutable<ILogicalExpression> expr) {
-        super();
         this.variables.add(var);
         this.expressions.add(expr);
     }
@@ -90,10 +89,8 @@ public class AssignOperator extends AbstractAssignOperator {
         PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
         int n = variables.size();
         for (int i = 0; i < n; i++) {
-            env.setVarType(
-                    variables.get(i),
-                    ctx.getExpressionTypeComputer().getType(expressions.get(i).getValue(), ctx.getMetadataProvider(),
-                            env));
+            env.setVarType(variables.get(i), ctx.getExpressionTypeComputer().getType(expressions.get(i).getValue(),
+                    ctx.getMetadataProvider(), env));
             if (expressions.get(i).getValue().getExpressionTag() == LogicalExpressionTag.VARIABLE) {
                 LogicalVariable var = ((VariableReferenceExpression) expressions.get(i).getValue())
                         .getVariableReference();
@@ -113,11 +110,10 @@ public class AssignOperator extends AbstractAssignOperator {
     }
 
     public LocalOrderProperty getExplicitOrderingProperty() {
-		return explicitOrderingProperty;
-	}
+        return explicitOrderingProperty;
+    }
 
-	public void setExplicitOrderingProperty(
-			LocalOrderProperty explicitOrderingProperty) {
-		this.explicitOrderingProperty = explicitOrderingProperty;
-	}
+    public void setExplicitOrderingProperty(LocalOrderProperty explicitOrderingProperty) {
+        this.explicitOrderingProperty = explicitOrderingProperty;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
index 607db69..6a74eb9 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -49,18 +49,33 @@ public class InsertDeleteUpsertOperator extends AbstractLogicalOperator {
     private final Kind operation;
     private final boolean bulkload;
     private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
+    private final List<Mutable<ILogicalExpression>> additionalNonFilteringExpressions;
+    // previous record (for UPSERT)
     private LogicalVariable prevRecordVar;
     private Object prevRecordType;
+    // previous filter (for UPSERT)
     private LogicalVariable prevFilterVar;
     private Object prevFilterType;
 
     public InsertDeleteUpsertOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
+            List<Mutable<ILogicalExpression>> primaryKeyExprs,
+            List<Mutable<ILogicalExpression>> additionalNonFilteringExpressions, Kind operation, boolean bulkload) {
+        this.dataSource = dataSource;
+        this.payloadExpr = payloadExpr;
+        this.primaryKeyExprs = primaryKeyExprs;
+        this.operation = operation;
+        this.bulkload = bulkload;
+        this.additionalNonFilteringExpressions = additionalNonFilteringExpressions;
+    }
+
+    public InsertDeleteUpsertOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
             List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation, boolean bulkload) {
         this.dataSource = dataSource;
         this.payloadExpr = payloadExpr;
         this.primaryKeyExprs = primaryKeyExprs;
         this.operation = operation;
         this.bulkload = bulkload;
+        this.additionalNonFilteringExpressions = null;
     }
 
     @Override
@@ -98,6 +113,11 @@ public class InsertDeleteUpsertOperator extends AbstractLogicalOperator {
                 changed |= transform.transform(e);
             }
         }
+        if (additionalNonFilteringExpressions != null) {
+            for (Mutable<ILogicalExpression> e : additionalNonFilteringExpressions) {
+                changed |= transform.transform(e);
+            }
+        }
         return changed;
     }
 
@@ -108,7 +128,7 @@ public class InsertDeleteUpsertOperator extends AbstractLogicalOperator {
 
     @Override
     public boolean isMap() {
-        return false;
+        return true;
     }
 
     @Override
@@ -165,6 +185,10 @@ public class InsertDeleteUpsertOperator extends AbstractLogicalOperator {
         return bulkload;
     }
 
+    public List<Mutable<ILogicalExpression>> getAdditionalNonFilteringExpressions() {
+        return additionalNonFilteringExpressions;
+    }
+
     public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
         this.additionalFilteringExpressions = additionalFilteringExpressions;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java
index ceca63b..672d32c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java
@@ -41,12 +41,12 @@ public class ProjectOperator extends AbstractLogicalOperator {
 
     public ProjectOperator(LogicalVariable v) {
         this.variables = new ArrayList<LogicalVariable>(1);
-        this.getVariables().add(v);
+        variables.add(v);
     }
 
     @Override
     public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) {
-        // do nothing
+        // Do nothing
         return false;
     }
 
@@ -83,5 +83,4 @@ public class ProjectOperator extends AbstractLogicalOperator {
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         return createPropagatingAllInputsTypeEnvironment(ctx);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
index ecc4772..e90298d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
@@ -19,7 +19,6 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -36,7 +35,8 @@ public class UnnestOperator extends AbstractUnnestNonMapOperator {
     }
 
     public UnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression,
-            LogicalVariable positionalVariable, Object positionalVariableType, IUnnestingPositionWriter positionWriter) {
+            LogicalVariable positionalVariable, Object positionalVariableType,
+            IUnnestingPositionWriter positionWriter) {
         super(variable, expression, positionalVariable, positionalVariableType, positionWriter);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 6e895bd..2feea5d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -382,6 +382,12 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
                 e.getValue().getUsedVariables(usedVariables);
             }
         }
+        // 4. The Other variables (Not key, Not payload, and Not Filter)
+        if (op.getAdditionalNonFilteringExpressions() != null) {
+            for (Mutable<ILogicalExpression> e : op.getAdditionalNonFilteringExpressions()) {
+                e.getValue().getUsedVariables(usedVariables);
+            }
+        }
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
index 036ac05..8ac3271 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -52,13 +52,16 @@ public class BulkloadPOperator extends AbstractPhysicalOperator {
     private final LogicalVariable payload;
     private final List<LogicalVariable> primaryKeys;
     private final List<LogicalVariable> additionalFilteringKeys;
+    private final List<LogicalVariable> additionalNonFilterVars;
     private final IDataSource<?> dataSource;
 
     public BulkloadPOperator(LogicalVariable payload, List<LogicalVariable> keys,
-           List<LogicalVariable> additionalFilteringKeys, IDataSource<?> dataSource) {
+            List<LogicalVariable> additionalFilteringKeys, List<LogicalVariable> additionalNonFilterVars,
+            IDataSource<?> dataSource) {
         this.payload = payload;
         this.primaryKeys = keys;
         this.additionalFilteringKeys = additionalFilteringKeys;
+        this.additionalNonFilterVars = additionalNonFilterVars;
         this.dataSource = dataSource;
     }
 
@@ -73,8 +76,8 @@ public class BulkloadPOperator extends AbstractPhysicalOperator {
         List<LogicalVariable> scanVariables = new ArrayList<>();
         scanVariables.addAll(primaryKeys);
         scanVariables.add(new LogicalVariable(-1));
-        IPhysicalPropertiesVector physicalProps = dataSource.getPropertiesProvider().computePropertiesVector(
-                scanVariables);
+        IPhysicalPropertiesVector physicalProps = dataSource.getPropertiesProvider()
+                .computePropertiesVector(scanVariables);
         StructuralPropertiesVector spv = new StructuralPropertiesVector(physicalProps.getPartitioningProperty(),
                 physicalProps.getLocalProperties());
         return new PhysicalRequirements(new IPhysicalPropertiesVector[] { spv },
@@ -85,24 +88,24 @@ public class BulkloadPOperator extends AbstractPhysicalOperator {
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
             throws AlgebricksException {
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         InsertDeleteUpsertOperator insertDeleteOp = (InsertDeleteUpsertOperator) op;
         assert insertDeleteOp.getOperation() == Kind.INSERT;
         assert insertDeleteOp.isBulkload();
-
         IMetadataProvider mp = context.getMetadataProvider();
         IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
         JobSpecification spec = builder.getJobSpec();
         RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
-        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getInsertRuntime(
-                dataSource, propagatedSchema, typeEnv, primaryKeys, payload, additionalFilteringKeys,
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getInsertRuntime(dataSource,
+                propagatedSchema, typeEnv, primaryKeys, payload, additionalFilteringKeys, additionalNonFilterVars,
                 inputDesc, context, spec, true);
         builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
         builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
@@ -120,5 +123,4 @@ public class BulkloadPOperator extends AbstractPhysicalOperator {
         return false;
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index d844f37..2e4b647 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -48,22 +48,22 @@ import org.apache.hyracks.api.job.JobSpecification;
 @SuppressWarnings("rawtypes")
 public class InsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
 
-    private LogicalVariable payload;
-    private List<LogicalVariable> keys;
-    private IDataSource<?> dataSource;
+    private final LogicalVariable payload;
+    private final List<LogicalVariable> keys;
+    private final IDataSource<?> dataSource;
     private final List<LogicalVariable> additionalFilteringKeys;
+    private final List<LogicalVariable> additionalNonFilteringFields;
     private final Kind operation;
-    private final LogicalVariable prevPayload;
 
     public InsertDeleteUpsertPOperator(LogicalVariable payload, List<LogicalVariable> keys,
             List<LogicalVariable> additionalFilteringKeys, IDataSource dataSource, Kind operation,
-            LogicalVariable prevPayload) {
+            List<LogicalVariable> additionalNonFilteringFields) {
         this.payload = payload;
         this.keys = keys;
         this.dataSource = dataSource;
         this.additionalFilteringKeys = additionalFilteringKeys;
         this.operation = operation;
-        this.prevPayload = prevPayload;
+        this.additionalNonFilteringFields = additionalNonFilteringFields;
     }
 
     @Override
@@ -106,13 +106,13 @@ public class InsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
         if (operation == Kind.INSERT) {
             runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
-                    additionalFilteringKeys, inputDesc, context, spec, false);
+                    additionalFilteringKeys, additionalNonFilteringFields, inputDesc, context, spec, false);
         } else if (operation == Kind.DELETE) {
             runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
                     additionalFilteringKeys, inputDesc, context, spec);
         } else if (operation == Kind.UPSERT) {
             runtimeAndConstraints = mp.getUpsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
-                    additionalFilteringKeys, prevPayload, inputDesc, context, spec);
+                    additionalFilteringKeys, additionalNonFilteringFields, inputDesc, context, spec);
         } else {
             throw new AlgebricksException("Unsupported Operation " + operation);
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/ALogicalPlanImpl.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/ALogicalPlanImpl.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/ALogicalPlanImpl.java
index be84a9f..082c2ce 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/ALogicalPlanImpl.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/ALogicalPlanImpl.java
@@ -22,14 +22,12 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
 
-/*
- * Author: Guangqiang Li
- * Created on Jul 9, 2009
- */
 public class ALogicalPlanImpl implements ILogicalPlan {
     private List<Mutable<ILogicalOperator>> roots;
 
@@ -46,6 +44,7 @@ public class ALogicalPlanImpl implements ILogicalPlan {
         roots.add(root);
     }
 
+    @Override
     public List<Mutable<ILogicalOperator>> getRoots() {
         return roots;
     }
@@ -53,4 +52,20 @@ public class ALogicalPlanImpl implements ILogicalPlan {
     public void setRoots(List<Mutable<ILogicalOperator>> roots) {
         this.roots = roots;
     }
+
+    public static String prettyPrintPlan(ILogicalPlan plan) throws AlgebricksException {
+        LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
+        StringBuilder buffer = new StringBuilder();
+        PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);
+        return buffer.toString();
+    }
+
+    @Override
+    public String toString() {
+        try {
+            return ALogicalPlanImpl.prettyPrintPlan(this);
+        } catch (AlgebricksException e) {
+            throw new IllegalStateException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index b85202d..a7ea706 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -379,10 +379,14 @@ public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisito
         StringBuilder buffer = new StringBuilder();
         String header = getIndexOpString(op.getOperation());
         addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from ")
-                .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
+                .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent));
+        if (op.getAdditionalNonFilteringExpressions() != null) {
+            pprintExprList(op.getAdditionalNonFilteringExpressions(), buffer, indent);
+        }
+        buffer.append(" partitioned by ");
         pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
         if (op.getOperation() == Kind.UPSERT) {
-            buffer.append(" out: ([" + op.getPrevRecordVar() + "] <-{record-before-upsert}) ");
+            buffer.append(" out: ([record-before-upsert:" + op.getPrevRecordVar() + "]) ");
         }
         if (op.isBulkload()) {
             buffer.append(" [bulkload]");
@@ -445,7 +449,7 @@ public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisito
         return buffer.toString();
     }
 
-    protected static final StringBuilder addIndent(StringBuilder buffer, int level) {
+    protected static StringBuilder addIndent(StringBuilder buffer, int level) {
         for (int i = 0; i < level; ++i) {
             buffer.append(' ');
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index 8bf3dbb..f586af7 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -78,7 +78,6 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
         return null;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<String> dataSource,
             List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
@@ -216,8 +215,9 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<String> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
-            LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification jobSpec, boolean bulkload) throws AlgebricksException {
+            LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterKeyFields,
+            List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification jobSpec, boolean bulkload) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
@@ -234,8 +234,9 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<String> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
-            LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, LogicalVariable prevPayload,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
+            LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterFields,
+            List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification jobSpec) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
@@ -246,7 +247,7 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
             ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
-            LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context,
+            LogicalVariable prevAdditionalFilteringFields, RecordDescriptor inputDesc, JobGenContext context,
             JobSpecification spec) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
index e5581dc..11ff4be 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
@@ -28,7 +28,6 @@ import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -93,7 +92,8 @@ public class ExtractCommonExpressionsRule implements IAlgebraicRewriteRule {
     private final Map<ILogicalExpression, ExprEquivalenceClass> exprEqClassMap = new HashMap<ILogicalExpression, ExprEquivalenceClass>();
 
     // Set of operators for which common subexpression elimination should not be performed.
-    private static final Set<LogicalOperatorTag> ignoreOps = new HashSet<LogicalOperatorTag>();
+    private static final Set<LogicalOperatorTag> ignoreOps = new HashSet<LogicalOperatorTag>(6);
+
     static {
         ignoreOps.add(LogicalOperatorTag.UNNEST);
         ignoreOps.add(LogicalOperatorTag.UNNEST_MAP);
@@ -110,7 +110,8 @@ public class ExtractCommonExpressionsRule implements IAlgebraicRewriteRule {
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         exprEqClassMap.clear();
         substVisitor.setContext(context);
         boolean modified = removeCommonExpressions(opRef, context);
@@ -310,14 +311,14 @@ public class ExtractCommonExpressionsRule implements IAlgebraicRewriteRule {
                 selectOp.getInputs().add(new MutableObject<ILogicalOperator>(op.getInputs().get(0).getValue()));
                 op.getInputs().get(0).setValue(selectOp);
                 // Set firstOp to be the select below op, since we want to assign the common subexpr there.
-                firstOp = (AbstractLogicalOperator) selectOp;
+                firstOp = selectOp;
             } else if (firstOp.getInputs().size() > 1) {
                 // Bail for any non-join operator with multiple inputs.
                 return false;
             }
             LogicalVariable newVar = context.newVar();
-            AssignOperator newAssign = new AssignOperator(newVar, new MutableObject<ILogicalExpression>(firstExprRef
-                    .getValue().cloneExpression()));
+            AssignOperator newAssign = new AssignOperator(newVar,
+                    new MutableObject<ILogicalExpression>(firstExprRef.getValue().cloneExpression()));
             // Place assign below firstOp.
             newAssign.getInputs().add(new MutableObject<ILogicalOperator>(firstOp.getInputs().get(0).getValue()));
             newAssign.setExecutionMode(firstOp.getExecutionMode());

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 8e12ece..6524d87 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -154,8 +154,9 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                     if (gby.getNestedPlans().size() == 1) {
                         ILogicalPlan p0 = gby.getNestedPlans().get(0);
                         if (p0.getRoots().size() == 1) {
-                            if (gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE || gby
-                                    .getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE) {
+                            if ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE)
+                                    || (gby.getAnnotations()
+                                            .get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE)) {
                                 if (!topLevelOp) {
                                     throw new NotImplementedException(
                                             "External hash group-by for nested grouping is not implemented.");
@@ -299,17 +300,22 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                     LogicalVariable payload;
                     List<LogicalVariable> keys = new ArrayList<LogicalVariable>();
                     List<LogicalVariable> additionalFilteringKeys = null;
+                    List<LogicalVariable> additionalNonFilterVariables = null;
+                    if (opLoad.getAdditionalNonFilteringExpressions() != null) {
+                        additionalNonFilterVariables = new ArrayList<LogicalVariable>();
+                        getKeys(opLoad.getAdditionalNonFilteringExpressions(), additionalNonFilterVariables);
+                    }
                     payload = getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getPrimaryKeyExpressions(), keys);
                     if (opLoad.getAdditionalFilteringExpressions() != null) {
                         additionalFilteringKeys = new ArrayList<LogicalVariable>();
                         getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
                     }
                     if (opLoad.isBulkload()) {
-                        op.setPhysicalOperator(
-                                new BulkloadPOperator(payload, keys, additionalFilteringKeys, opLoad.getDataSource()));
+                        op.setPhysicalOperator(new BulkloadPOperator(payload, keys, additionalFilteringKeys,
+                                additionalNonFilterVariables, opLoad.getDataSource()));
                     } else {
                         op.setPhysicalOperator(new InsertDeleteUpsertPOperator(payload, keys, additionalFilteringKeys,
-                                opLoad.getDataSource(), opLoad.getOperation(), opLoad.getPrevRecordVar()));
+                                opLoad.getDataSource(), opLoad.getOperation(), additionalNonFilterVariables));
                     }
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/RecordDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/RecordDescriptor.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/RecordDescriptor.java
index 80e86e9..2ee4a29 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/RecordDescriptor.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/RecordDescriptor.java
@@ -20,7 +20,7 @@ package org.apache.hyracks.api.dataflow.value;
 
 import java.io.Serializable;
 
-@SuppressWarnings("unchecked")
+@SuppressWarnings("rawtypes")
 public final class RecordDescriptor implements Serializable {
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
index 84ddab6..e5f7d09 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
@@ -26,13 +26,15 @@ import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.util.GrowableArray;
 
 /**
  * Array backed tuple builder.
  *
- * @author vinayakb
+ * @deprecated Use IFrameFieldAppender.appendField to append fields directly.
  */
+@Deprecated
 public class ArrayTupleBuilder implements IDataOutputProvider {
     private final GrowableArray fieldData = new GrowableArray();
     private final int[] fEndOffsets;
@@ -166,4 +168,11 @@ public class ArrayTupleBuilder implements IDataOutputProvider {
     public void addFieldEndOffset() {
         fEndOffsets[nextField++] = fieldData.getLength();
     }
+
+    /**
+     * Adds a new field and fills it with the content of the passed value
+     */
+    public void addField(IValueReference data) throws HyracksDataException {
+        addField(data.getByteArray(), data.getStartOffset(), data.getLength());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index bab5463..e314cd1 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.dataflow.common.comm.io;
 
 import java.io.DataInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
@@ -28,6 +29,7 @@ import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 
 /**
@@ -41,8 +43,8 @@ import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
  * field slots.
  */
 public class FrameTupleAccessor implements IFrameTupleAccessor {
-    private int tupleCountOffset;
     private final RecordDescriptor recordDescriptor;
+    private int tupleCountOffset;
     private ByteBuffer buffer;
     private int start;
 
@@ -172,7 +174,7 @@ public class FrameTupleAccessor implements IFrameTupleAccessor {
      * using IserializerDeserializer can print incorrect results or throw exceptions.
      * A better way yet would be to use record pointable.
      */
-    public void prettyPrint(String prefix, int[] recordFields) {
+    public void prettyPrint(String prefix, int[] recordFields) throws IOException {
         ByteBufferInputStream bbis = new ByteBufferInputStream();
         DataInputStream dis = new DataInputStream(bbis);
         int tc = getTupleCount();
@@ -184,22 +186,61 @@ public class FrameTupleAccessor implements IFrameTupleAccessor {
         System.err.println(sb.toString());
     }
 
+    public void prettyPrint(int tIdx, int[] recordFields) throws IOException {
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream dis = new DataInputStream(bbis);
+        StringBuilder sb = new StringBuilder();
+        prettyPrint(tIdx, bbis, dis, sb, recordFields);
+        System.err.println(sb.toString());
+    }
+
+    public void prettyPrint(ITupleReference tuple, int fieldsIdx, int descIdx) throws HyracksDataException {
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream dis = new DataInputStream(bbis);
+        StringBuilder sb = new StringBuilder();
+        sb.append("[");
+        sb.append("f" + fieldsIdx + ":(" + tuple.getFieldStart(fieldsIdx) + ", "
+                + (tuple.getFieldLength(fieldsIdx) + tuple.getFieldStart(fieldsIdx)) + ") ");
+        sb.append("{");
+        ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(fieldsIdx));
+        bbis.setByteBuffer(bytebuff, tuple.getFieldStart(fieldsIdx));
+        sb.append(recordDescriptor.getFields()[descIdx].deserialize(dis));
+        sb.append("}");
+        sb.append("\n");
+        System.err.println(sb.toString());
+    }
+
+    public void prettyPrint(ITupleReference tuple, int[] descF) throws HyracksDataException {
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream dis = new DataInputStream(bbis);
+        StringBuilder sb = new StringBuilder();
+        sb.append("[");
+        for (int j = 0; j < descF.length; ++j) {
+            sb.append("f" + j + ":(" + tuple.getFieldStart(j) + ", "
+                    + (tuple.getFieldLength(j) + tuple.getFieldStart(j)) + ") ");
+            sb.append("{");
+            ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(j));
+            bbis.setByteBuffer(bytebuff, tuple.getFieldStart(j));
+            sb.append(recordDescriptor.getFields()[descF[j]].deserialize(dis));
+            sb.append("}");
+        }
+        sb.append("\n");
+        System.err.println(sb.toString());
+    }
+
     protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb,
-            int[] recordFields) {
+            int[] recordFields) throws IOException {
         Arrays.sort(recordFields);
         sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
         for (int j = 0; j < getFieldCount(); ++j) {
             sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") ");
             sb.append("{");
             bbis.setByteBuffer(buffer, getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j));
-            try {
-                if (Arrays.binarySearch(recordFields, j) >= 0) {
-                    sb.append("a record field: only print using pointable");
-                } else {
-                    sb.append(recordDescriptor.getFields()[j].deserialize(dis));
-                }
-            } catch (HyracksDataException e) {
-                e.printStackTrace();
+            if (Arrays.binarySearch(recordFields, j) >= 0) {
+                sb.append("{a record field: only print using pointable:");
+                sb.append("tag->" + dis.readByte() + "}");
+            } else {
+                sb.append(recordDescriptor.getFields()[j].deserialize(dis));
             }
             sb.append("}");
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileSplit.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileSplit.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileSplit.java
index 592fb9d..b201acd 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileSplit.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileSplit.java
@@ -25,11 +25,10 @@ import org.apache.hyracks.api.io.FileReference;
 
 public class FileSplit implements Serializable {
     private static final long serialVersionUID = 1L;
-
-    private final String nodeName;
     private final FileReference file;
     private final int ioDeviceId;
     private final int partition;
+    private final String nodeName;
 
     public FileSplit(String nodeName, FileReference file) {
         this.nodeName = nodeName;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7dabc19f/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java
new file mode 100644
index 0000000..c067313
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.file;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+
+import org.junit.Assert;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class CursorTest extends TestCase {
+
+    /**
+     * Create the test case
+     *
+     * @param testName
+     *            name of the test case
+     */
+    public CursorTest(final String testName) {
+        super(testName);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(CursorTest.class);
+    }
+
+    /**
+     *
+     */
+    public void test() {
+        FileInputStream in = null;
+        BufferedReader reader = null;
+        try {
+            in = new FileInputStream(
+                    Paths.get(getClass().getResource("/data/beer.txt").toURI()).toAbsolutePath().toString());
+            reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
+            // skip header
+            final FieldCursorForDelimitedDataParser cursor = new FieldCursorForDelimitedDataParser(reader, ',', '"');
+            // get number of fields from header (first record is header)
+            cursor.nextRecord();
+            int numOfFields = 0;
+            int expectedNumberOfRecords = 7307;
+            while (cursor.nextField()) {
+                numOfFields++;
+            }
+
+            int recordNumber = 0;
+            while (cursor.nextRecord()) {
+                int fieldNumber = 0;
+                while (cursor.nextField()) {
+                    if (cursor.isDoubleQuoteIncludedInThisField) {
+                        cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
+                        cursor.fEnd -= cursor.doubleQuoteCount;
+                    }
+                    fieldNumber++;
+                }
+                if ((fieldNumber > numOfFields) || (fieldNumber < numOfFields)) {
+                    System.err.println("Test case failed. Expected number of fields in each record is " + numOfFields
+                            + " and record number " + recordNumber + " was found to have " + fieldNumber);
+                    Assert.assertTrue(false);
+                }
+                recordNumber++;
+            }
+            if (recordNumber != expectedNumberOfRecords) {
+                System.err.println("Test case failed. Expected number of records is " + expectedNumberOfRecords
+                        + " and records was found to be " + recordNumber);
+                Assert.assertTrue(false);
+            } else {
+                System.err.println("TEST PASSED: " + recordNumber + " were lexed successfully.");
+            }
+        } catch (final Exception e) {
+            e.printStackTrace();
+            assert (false);
+        } finally {
+            try {
+                reader.close();
+            } catch (final IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message