asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [2/2] incubator-asterixdb-hyracks git commit: ASTERIXDB-1343: support heterogeneity of computation nodes and storage nodes.
Date Wed, 16 Mar 2016 06:20:21 GMT
ASTERIXDB-1343: support heterogeneity of computation nodes and storage nodes.

Change-Id: Ic21d8da2cd457aa17cc9861c0b92ac5960978e03
Reviewed-on: https://asterix-gerrit.ics.uci.edu/718
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/f6d1e054
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/f6d1e054
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/f6d1e054

Branch: refs/heads/master
Commit: f6d1e0544d3387865d1c394732525d1140bb2496
Parents: 1a2e95f
Author: Yingyi Bu <yingyi@couchbase.com>
Authored: Tue Mar 15 17:49:07 2016 -0700
Committer: Yingyi Bu <buyingyi@gmail.com>
Committed: Tue Mar 15 23:15:12 2016 -0700

----------------------------------------------------------------------
 .../api/HeuristicCompilerFactoryBuilder.java    | 21 ++++-----
 .../core/algebra/base/ILogicalOperator.java     |  6 +--
 .../core/algebra/base/IOptimizationContext.java | 47 +++++++++++---------
 .../core/algebra/base/IPhysicalOperator.java    |  6 ++-
 .../logical/AbstractLogicalOperator.java        |  4 +-
 .../physical/AbstractHashJoinPOperator.java     | 10 +++--
 .../AbstractPreclusteredGroupByPOperator.java   |  5 ++-
 .../physical/AbstractScanPOperator.java         |  3 +-
 .../physical/AbstractStableSortPOperator.java   |  2 +-
 .../operators/physical/AggregatePOperator.java  |  2 +-
 .../operators/physical/AssignPOperator.java     |  2 +-
 .../operators/physical/BroadcastPOperator.java  |  2 +-
 .../operators/physical/BulkloadPOperator.java   |  2 +-
 .../physical/DistributeResultPOperator.java     |  2 +-
 .../physical/EmptyTupleSourcePOperator.java     |  2 +-
 .../physical/ExternalGroupByPOperator.java      | 21 +++++----
 .../HashPartitionExchangePOperator.java         |  2 +-
 .../HashPartitionMergeExchangePOperator.java    |  2 +-
 .../physical/IndexBulkloadPOperator.java        |  2 +-
 .../IndexInsertDeleteUpsertPOperator.java       |  2 +-
 .../physical/InsertDeleteUpsertPOperator.java   |  2 +-
 .../operators/physical/IntersectPOperator.java  |  9 ++--
 .../physical/MaterializePOperator.java          |  2 +-
 .../operators/physical/NLJoinPOperator.java     | 25 ++++++-----
 .../physical/NestedTupleSourcePOperator.java    |  2 +-
 .../physical/OneToOneExchangePOperator.java     |  2 +-
 .../physical/PreSortedDistinctByPOperator.java  | 15 ++++---
 .../physical/RandomMergeExchangePOperator.java  |  2 +-
 .../physical/RandomPartitionPOperator.java      |  2 +-
 .../physical/RangePartitionMergePOperator.java  |  2 +-
 .../physical/RangePartitionPOperator.java       |  2 +-
 .../operators/physical/ReplicatePOperator.java  |  2 +-
 .../physical/RunningAggregatePOperator.java     |  2 +-
 .../operators/physical/SinkPOperator.java       |  2 +-
 .../operators/physical/SinkWritePOperator.java  |  2 +-
 .../physical/SortGroupByPOperator.java          |  2 +-
 .../physical/SortMergeExchangePOperator.java    |  2 +-
 .../physical/StreamLimitPOperator.java          |  2 +-
 .../physical/StreamProjectPOperator.java        |  2 +-
 .../physical/StreamSelectPOperator.java         |  2 +-
 .../StringStreamingScriptPOperator.java         |  2 +-
 .../operators/physical/SubplanPOperator.java    |  2 +-
 .../operators/physical/TokenizePOperator.java   |  2 +-
 .../operators/physical/UnionAllPOperator.java   |  2 +-
 .../physical/WriteResultPOperator.java          |  2 +-
 .../properties/DefaultNodeGroupDomain.java      | 39 +++++++++++++---
 .../properties/IPhysicalPropertiesVector.java   |  1 -
 .../properties/OrderedPartitionedProperty.java  |  4 +-
 .../core/algebra/properties/PropertiesUtil.java | 12 ++---
 .../properties/StructuralPropertiesVector.java  |  4 +-
 .../UnorderedPartitionedProperty.java           |  4 +-
 .../algebricks/core/jobgen/impl/JobBuilder.java | 18 +++-----
 .../base/AlgebricksOptimizationContext.java     | 17 +++++--
 .../base/IOptimizationContextFactory.java       |  3 +-
 .../piglet/compiler/PigletCompiler.java         | 21 +++++----
 .../rules/EnforceStructuralPropertiesRule.java  | 12 +++--
 56 files changed, 215 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 0d899b9..1d3a55c 100644
--- a/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.algebricks.compiler.api;
 
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -51,11 +52,11 @@ public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuil
                 IExpressionEvalSizeComputer expressionEvalSizeComputer,
                 IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
                 IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
-                PhysicalOptimizationConfig physicalOptimizationConfig) {
+                PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations) {
             LogicalOperatorPrettyPrintVisitor prettyPrintVisitor = new LogicalOperatorPrettyPrintVisitor();
             return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
                     mergeAggregationExpressionFactory, expressionTypeComputer, nullableTypeComputer,
-                    physicalOptimizationConfig, prettyPrintVisitor);
+                    physicalOptimizationConfig, clusterLocations, prettyPrintVisitor);
         }
     }
 
@@ -77,7 +78,7 @@ public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuil
                     int varCounter) {
                 final IOptimizationContext oc = optCtxFactory.createOptimizationContext(varCounter,
                         expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
-                        nullableTypeComputer, physicalOptimizationConfig);
+                        nullableTypeComputer, physicalOptimizationConfig, clusterLocations);
                 oc.setMetadataDeclarations(metadata);
                 final HeuristicOptimizer opt = new HeuristicOptimizer(plan, logicalRewrites, physicalRewrites, oc);
                 return new ICompiler() {
@@ -92,13 +93,13 @@ public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuil
                             IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
                         AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting Job Generation.\n");
                         JobGenContext context = new JobGenContext(null, metadata, appContext,
-                                serializerDeserializerProvider, hashFunctionFactoryProvider,
-                                hashFunctionFamilyProvider, comparatorFactoryProvider, typeTraitProvider,
-                                binaryBooleanInspectorFactory, binaryIntegerInspectorFactory, printerProvider,
-                                nullWriterFactory, normalizedKeyComputerFactoryProvider, expressionRuntimeProvider,
-                                expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
-                                partialAggregationTypeComputer, predEvaluatorFactoryProvider,
-                                physicalOptimizationConfig.getFrameSize(), clusterLocations);
+                                serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider,
+                                comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory,
+                                binaryIntegerInspectorFactory, printerProvider, nullWriterFactory,
+                                normalizedKeyComputerFactoryProvider, expressionRuntimeProvider, expressionTypeComputer,
+                                nullableTypeComputer, oc, expressionEvalSizeComputer, partialAggregationTypeComputer,
+                                predEvaluatorFactoryProvider, physicalOptimizationConfig.getFrameSize(),
+                                clusterLocations);
 
                         PlanCompiler pc = new PlanCompiler(context);
                         return pc.compilePlan(plan, null, jobEventListenerFactory);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index 9d68fae..707a7db 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
@@ -66,7 +65,7 @@ public interface ILogicalOperator {
 
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException;
+                    throws AlgebricksException;
 
     // variables
 
@@ -88,7 +87,8 @@ public interface ILogicalOperator {
      * @return for each child, one vector of required physical properties
      */
 
-    public PhysicalRequirements getRequiredPhysicalPropertiesForChildren(IPhysicalPropertiesVector requiredProperties);
+    public PhysicalRequirements getRequiredPhysicalPropertiesForChildren(IPhysicalPropertiesVector requiredProperties,
+            IOptimizationContext context);
 
     /**
      * @return the physical properties that this operator delivers, based on

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 6badac7..328697d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -28,6 +28,7 @@ import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
@@ -35,52 +36,54 @@ import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConf
 public interface IOptimizationContext extends ITypingContext, IVariableContext {
 
     @Override
-    public abstract IMetadataProvider<?, ?> getMetadataProvider();
+    public IMetadataProvider<?, ?> getMetadataProvider();
 
-    public abstract void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider);
+    public void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider);
 
-    public abstract boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op);
+    public boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op);
 
-    public abstract void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op);
+    public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op);
 
     /*
      * returns true if op1 and op2 have already been compared
      */
-    public abstract boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2);
+    public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2);
 
-    public abstract void removeFromAlreadyCompared(ILogicalOperator op1);
+    public void removeFromAlreadyCompared(ILogicalOperator op1);
 
-    public abstract void addNotToBeInlinedVar(LogicalVariable var);
+    public void addNotToBeInlinedVar(LogicalVariable var);
 
-    public abstract boolean shouldNotBeInlined(LogicalVariable var);
+    public boolean shouldNotBeInlined(LogicalVariable var);
 
-    public abstract void addPrimaryKey(FunctionalDependency pk);
+    public void addPrimaryKey(FunctionalDependency pk);
 
-    public abstract List<LogicalVariable> findPrimaryKey(LogicalVariable var);
+    public List<LogicalVariable> findPrimaryKey(LogicalVariable var);
 
-    public abstract void putEquivalenceClassMap(ILogicalOperator op, Map<LogicalVariable, EquivalenceClass> eqClassMap);
+    public void putEquivalenceClassMap(ILogicalOperator op, Map<LogicalVariable, EquivalenceClass> eqClassMap);
 
-    public abstract Map<LogicalVariable, EquivalenceClass> getEquivalenceClassMap(ILogicalOperator op);
+    public Map<LogicalVariable, EquivalenceClass> getEquivalenceClassMap(ILogicalOperator op);
 
-    public abstract void putFDList(ILogicalOperator op, List<FunctionalDependency> fdList);
+    public void putFDList(ILogicalOperator op, List<FunctionalDependency> fdList);
 
-    public abstract List<FunctionalDependency> getFDList(ILogicalOperator op);
+    public List<FunctionalDependency> getFDList(ILogicalOperator op);
 
     public void clearAllFDAndEquivalenceClasses();
 
-    public abstract void putLogicalPropertiesVector(ILogicalOperator op, ILogicalPropertiesVector v);
+    public void putLogicalPropertiesVector(ILogicalOperator op, ILogicalPropertiesVector v);
 
-    public abstract ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op);
+    public ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op);
 
-    public abstract IExpressionEvalSizeComputer getExpressionEvalSizeComputer();
+    public IExpressionEvalSizeComputer getExpressionEvalSizeComputer();
 
-    public abstract IVariableEvalSizeEnvironment getVariableEvalSizeEnvironment();
+    public IVariableEvalSizeEnvironment getVariableEvalSizeEnvironment();
 
-    public abstract IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory();
+    public IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory();
 
-    public abstract PhysicalOptimizationConfig getPhysicalOptimizationConfig();
+    public PhysicalOptimizationConfig getPhysicalOptimizationConfig();
 
-    public abstract void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars);
+    public void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars);
 
-    public abstract LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor();
+    public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor();
+
+    public INodeDomain getComputationNodeDomain();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
index a3ef5d9..8c0ab2f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
@@ -35,10 +35,12 @@ public interface IPhysicalOperator {
      * @param reqdByParent
      *            parent's requirements, which are not enforced for now, as we
      *            only explore one plan
+     * @param context
+     *            the optimization context
      * @return for each child, one vector of required physical properties
      */
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent);
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context);
 
     /**
      * @return the physical properties that this operator delivers, based on
@@ -51,7 +53,7 @@ public interface IPhysicalOperator {
 
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException;
+                    throws AlgebricksException;
 
     public void disableJobGenBelowMe();
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 5e9eef6..2410ca0 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -102,8 +102,8 @@ public abstract class AbstractLogicalOperator implements ILogicalOperator {
 
     @Override
     public final PhysicalRequirements getRequiredPhysicalPropertiesForChildren(
-            IPhysicalPropertiesVector requiredProperties) {
-        return physicalOperator.getRequiredPropertiesForChildren(this, requiredProperties);
+            IPhysicalPropertiesVector requiredProperties, IOptimizationContext context) {
+        return physicalOperator.getRequiredPropertiesForChildren(this, requiredProperties, context);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 09d2253..7077014 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -86,7 +86,7 @@ public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
         // In a cost-based optimizer, we would also try to propagate the
         // parent's partitioning requirements.
@@ -97,12 +97,14 @@ public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
         if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
             switch (partitioningType) {
                 case PAIRWISE: {
-                    pp1 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysLeftBranch), null);
-                    pp2 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysRightBranch), null);
+                    pp1 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysLeftBranch),
+                            context.getComputationNodeDomain());
+                    pp2 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysRightBranch),
+                            context.getComputationNodeDomain());
                     break;
                 }
                 case BROADCAST: {
-                    pp2 = new BroadcastPartitioningProperty(null);
+                    pp2 = new BroadcastPartitioningProperty(context.getComputationNodeDomain());
                     break;
                 }
                 default: {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 2c93cd4..2d00e4c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -138,7 +138,7 @@ public abstract class AbstractPreclusteredGroupByPOperator extends AbstractPhysi
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
         List<ILocalStructuralProperty> localProps = null;
 
@@ -233,7 +233,8 @@ public abstract class AbstractPreclusteredGroupByPOperator extends AbstractPhysi
         IPartitioningProperty pp = null;
         AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
         if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
-            pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), null);
+            pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList),
+                    context.getComputationNodeDomain());
         }
         pv[0] = new StructuralPropertiesVector(pp, localProps);
         return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
index 2ab24d2..5159ac5 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 
@@ -26,7 +27,7 @@ public abstract class AbstractScanPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index 4b13645..3759956 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -69,7 +69,7 @@ public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperat
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
         if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
             if (orderProp == null) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 2a2eb1e..3242fa0 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -72,7 +72,7 @@ public class AggregatePOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         AggregateOperator aggOp = (AggregateOperator) op;
         StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
         if (aggOp.isGlobal() && aggOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 5b6c40a..5aed63e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -63,7 +63,7 @@ public class AssignPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
index 03a8666..69bfe2a 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
@@ -61,7 +61,7 @@ public class BroadcastPOperator extends AbstractExchangePOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
index 8ac3271..dda5456 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -72,7 +72,7 @@ public class BulkloadPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<LogicalVariable> scanVariables = new ArrayList<>();
         scanVariables.addAll(primaryKeys);
         scanVariables.add(new LogicalVariable(-1));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index 5823bc0..b3e8385 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -68,7 +68,7 @@ public class DistributeResultPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         DistributeResultOperator write = (DistributeResultOperator) op;
         IDataSink sink = write.getDataSink();
         IPartitioningProperty pp = sink.getPartitioningProperty();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
index 4e3a5bc..dcb7e15 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
@@ -56,7 +56,7 @@ public class EmptyTupleSourcePOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index b15ea0b..7772620 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -135,12 +135,12 @@ public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
         if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
             StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
-            pv[0] = new StructuralPropertiesVector(
-                    new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnSet), null), null);
+            pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(
+                    new ListSet<LogicalVariable>(columnSet), context.getComputationNodeDomain()), null);
             return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
         } else {
             return emptyUnaryRequirements();
@@ -206,13 +206,16 @@ public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
         }
 
         List<LogicalVariable> keyAndDecVariables = new ArrayList<LogicalVariable>();
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getGroupByList())
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getGroupByList()) {
             keyAndDecVariables.add(p.first);
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList())
+        }
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
             keyAndDecVariables.add(GroupByOperator.getDecorVariable(p));
+        }
 
-        for (LogicalVariable var : keyAndDecVariables)
+        for (LogicalVariable var : keyAndDecVariables) {
             aggOpInputEnv.setVarType(var, outputEnv.getVarType(var));
+        }
 
         compileSubplans(inputSchemas[0], gby, opSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
@@ -236,10 +239,12 @@ public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
         for (Object type : intermediateTypes) {
             aggOpInputEnv.setVarType(usedVars.get(i++), type);
         }
-        for (LogicalVariable keyVar : keyAndDecVariables)
+        for (LogicalVariable keyVar : keyAndDecVariables) {
             localInputSchemas[0].addVariable(keyVar);
-        for (LogicalVariable usedVar : usedVars)
+        }
+        for (LogicalVariable usedVar : usedVars) {
             localInputSchemas[0].addVariable(usedVar);
+        }
         for (i = 0; i < n; i++) {
             AggregateFunctionCallExpression mergeFun = (AggregateFunctionCallExpression) aggOp.getMergeExpressions()
                     .get(i).getValue();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
index 0ff1e47..34c707b 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
@@ -76,7 +76,7 @@ public class HashPartitionExchangePOperator extends AbstractExchangePOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index 21be272..17322b6 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -100,7 +100,7 @@ public class HashPartitionMergeExchangePOperator extends AbstractExchangePOperat
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>();
         List<OrderColumn> columns = new ArrayList<OrderColumn>();
         for (OrderColumn oc : orderColumns) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
index b837bfa..50ab6aa 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -83,7 +83,7 @@ public class IndexBulkloadPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<LogicalVariable> scanVariables = new ArrayList<>();
         scanVariables.addAll(primaryKeys);
         scanVariables.add(new LogicalVariable(-1));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 4702361..f29fd6f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -87,7 +87,7 @@ public class IndexInsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
         scanVariables.addAll(primaryKeys);
         scanVariables.add(new LogicalVariable(-1));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index 2e4b647..f0bd603 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -80,7 +80,7 @@ public class InsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
         scanVariables.addAll(keys);
         scanVariables.add(new LogicalVariable(-1));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
index b6d0f1f..5f43c3e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -21,7 +21,9 @@ package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
@@ -40,8 +42,8 @@ import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalProperties
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
@@ -61,7 +63,7 @@ public class IntersectPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         IntersectOperator intersectOp = (IntersectOperator) iop;
         StructuralPropertiesVector[] pv = new StructuralPropertiesVector[intersectOp.getNumInput()];
         for (int i = 0; i < intersectOp.getNumInput(); i++) {
@@ -73,7 +75,8 @@ public class IntersectPOperator extends AbstractPhysicalOperator {
             localProps.add(new LocalOrderProperty(orderColumns));
             IPartitioningProperty pp = null;
             if (intersectOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
-                pp = new RandomPartitioningProperty(null);
+                Set<LogicalVariable> partitioningVariables = new HashSet<>(intersectOp.getInputVariables(i));
+                pp = new UnorderedPartitionedProperty(partitioningVariables, null);
             }
             pv[i] = new StructuralPropertiesVector(pp, localProps);
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
index a154d91..c55a4ae 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
@@ -55,7 +55,7 @@ public class MaterializePOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index 7ff15d7..2622ae3 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -113,13 +113,14 @@ public class NLJoinPOperator extends AbstractJoinPOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         if (partitioningType != JoinPartitioningType.BROADCAST) {
             throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
         }
 
         StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
-        pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+        pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(context.getComputationNodeDomain()),
+                null);
         pv[1] = new StructuralPropertiesVector(null, null);
         return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
     }
@@ -225,10 +226,11 @@ public class NLJoinPOperator extends AbstractJoinPOperator {
             }
             boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(),
                     p.getLength());
-            if (result)
+            if (result) {
                 return 0;
-            else
+            } else {
                 return 1;
+            }
         }
     }
 
@@ -256,28 +258,31 @@ public class NLJoinPOperator extends AbstractJoinPOperator {
         @Override
         public byte[] getFieldData(int fIdx) {
             int leftFieldCount = refLeft.getFieldCount();
-            if (fIdx < leftFieldCount)
+            if (fIdx < leftFieldCount) {
                 return refLeft.getFieldData(fIdx);
-            else
+            } else {
                 return refRight.getFieldData(fIdx - leftFieldCount);
+            }
         }
 
         @Override
         public int getFieldStart(int fIdx) {
             int leftFieldCount = refLeft.getFieldCount();
-            if (fIdx < leftFieldCount)
+            if (fIdx < leftFieldCount) {
                 return refLeft.getFieldStart(fIdx);
-            else
+            } else {
                 return refRight.getFieldStart(fIdx - leftFieldCount);
+            }
         }
 
         @Override
         public int getFieldLength(int fIdx) {
             int leftFieldCount = refLeft.getFieldCount();
-            if (fIdx < leftFieldCount)
+            if (fIdx < leftFieldCount) {
                 return refLeft.getFieldLength(fIdx);
-            else
+            } else {
                 return refRight.getFieldLength(fIdx - leftFieldCount);
+            }
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
index 7535c82..e8cc05f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
@@ -89,7 +89,7 @@ public class NestedTupleSourcePOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
index 27a5357..818e1ec 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
@@ -51,7 +51,7 @@ public class OneToOneExchangePOperator extends AbstractExchangePOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
index df65906..fe11f64 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -78,7 +78,7 @@ public class PreSortedDistinctByPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
         List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>();
         List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
@@ -89,7 +89,8 @@ public class PreSortedDistinctByPOperator extends AbstractPhysicalOperator {
         IPartitioningProperty pp = null;
         AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
         if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
-            pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), null);
+            pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList),
+                    context.getComputationNodeDomain());
         }
         pv[0] = new StructuralPropertiesVector(pp, localProps);
         return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
@@ -98,7 +99,7 @@ public class PreSortedDistinctByPOperator extends AbstractPhysicalOperator {
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
 
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]);
@@ -119,11 +120,11 @@ public class PreSortedDistinctByPOperator extends AbstractPhysicalOperator {
             keysAndDecs[i + keys.length] = fdColumns[i];
         }
 
-        IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
-                columnList, context.getTypeEnvironment(op), context);
+        IBinaryComparatorFactory[] comparatorFactories = JobGenHelper
+                .variablesToAscBinaryComparatorFactories(columnList, context.getTypeEnvironment(op), context);
         IAggregateEvaluatorFactory[] aggFactories = new IAggregateEvaluatorFactory[] {};
-        IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(
-                aggFactories, keysAndDecs);
+        IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aggFactories,
+                keysAndDecs);
 
         RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
                 context);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
index bfe8b36..e11a64f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
@@ -51,7 +51,7 @@ public class RandomMergeExchangePOperator extends AbstractExchangePOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
index af0087d..7237d24 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
@@ -85,7 +85,7 @@ public class RandomPartitionPOperator extends AbstractExchangePOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
index d47c31f..1f70f2a 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
@@ -105,7 +105,7 @@ public class RangePartitionMergePOperator extends AbstractExchangePOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>();
         List<OrderColumn> columns = new ArrayList<OrderColumn>();
         for (OrderColumn oc : partitioningFields) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
index dca3d97..9046ce2 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
@@ -85,7 +85,7 @@ public class RangePartitionPOperator extends AbstractExchangePOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
index 19fb5d4..14a8f16 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -50,7 +50,7 @@ public class ReplicatePOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 2c842e3..8e4ca18 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -63,7 +63,7 @@ public class RunningAggregatePOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index 6246ad2..71acecf 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -75,7 +75,7 @@ public class SinkPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
       return emptyUnaryRequirements(op.getInputs().size());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index 0b100ee..35f9444 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -67,7 +67,7 @@ public class SinkWritePOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         WriteOperator write = (WriteOperator) op;
         IDataSink sink = write.getDataSink();
         IPartitioningProperty pp = sink.getPartitioningProperty();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
index 7465711..c08ff85 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -138,7 +138,7 @@ public class SortGroupByPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
index f264284..81f6e6b 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
@@ -121,7 +121,7 @@ public class SortMergeExchangePOperator extends AbstractExchangePOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>(sortColumns.length);
         localProps.add(new LocalOrderProperty(Arrays.asList(sortColumns)));
         StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
index f47e671..99be356 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -71,7 +71,7 @@ public class StreamLimitPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
         if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
             StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
index 326172f..184cbbc 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -49,7 +49,7 @@ public class StreamProjectPOperator extends AbstractPropagatePropertiesForUsedVa
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
index 2c40b3b..9b35922 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
@@ -54,7 +54,7 @@ public class StreamSelectPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
index bd46230..1f5159d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
@@ -49,7 +49,7 @@ public class StringStreamingScriptPOperator extends AbstractPropagatePropertiesF
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
index aa5672e..9fc0dd4 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
@@ -77,7 +77,7 @@ public class SubplanPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
index 98427fe..557a657 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
@@ -66,7 +66,7 @@ public class TokenizePOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index 2c407b7..e6517d0 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -64,7 +64,7 @@ public class UnionAllPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         StructuralPropertiesVector pv0 = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
         StructuralPropertiesVector pv1 = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
         return new PhysicalRequirements(new StructuralPropertiesVector[] { pv0, pv1 },

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
index 9c8ddc3..7ec1914 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
@@ -80,7 +80,7 @@ public class WriteResultPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
         scanVariables.addAll(keys);
         scanVariables.add(new LogicalVariable(-1));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
index 908dcc1..719c70e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
@@ -18,26 +18,53 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.properties;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
+
 public class DefaultNodeGroupDomain implements INodeDomain {
 
-    private String groupName;
+    private List<String> nodes = new ArrayList<>();
+
+    public DefaultNodeGroupDomain(List<String> nodes) {
+        this.nodes.addAll(nodes);
+    }
+
+    public DefaultNodeGroupDomain(DefaultNodeGroupDomain domain) {
+        this.nodes.addAll(domain.nodes);
+    }
 
-    public DefaultNodeGroupDomain(String groupName) {
-        this.groupName = groupName;
+    public DefaultNodeGroupDomain(AlgebricksPartitionConstraint clusterLocations) {
+        if (clusterLocations.getPartitionConstraintType() == PartitionConstraintType.ABSOLUTE) {
+            AlgebricksAbsolutePartitionConstraint absPc = (AlgebricksAbsolutePartitionConstraint) clusterLocations;
+            String[] locations = absPc.getLocations();
+            for (String location : locations) {
+                nodes.add(location);
+            }
+        } else {
+            throw new IllegalStateException("A node domain can only take absolute location constraints.");
+        }
     }
 
     @Override
     public boolean sameAs(INodeDomain domain) {
-        return true;
+        if (!(domain instanceof DefaultNodeGroupDomain)) {
+            return false;
+        }
+        DefaultNodeGroupDomain nodeDomain = (DefaultNodeGroupDomain) domain;
+        return nodes.equals(nodeDomain.nodes);
     }
 
     @Override
     public String toString() {
-        return "AsterixDomain(" + groupName + ")";
+        return nodes.toString();
     }
 
     @Override
     public Integer cardinality() {
-        return null;
+        return nodes.size();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6d1e054/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
index d872491..e298691 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
@@ -31,7 +31,6 @@ public interface IPhysicalPropertiesVector {
     public List<ILocalStructuralProperty> getLocalProperties();
 
     /**
-     *
      * @param reqd
      *            vector of required properties
      * @param equivalenceClasses



Mime
View raw message