asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Taewoo Kim (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: ASTERIXDB-1736: Remove Grace Hash Join and Hybrid Hash Join ...
Date Sat, 19 Nov 2016 21:37:50 GMT
Taewoo Kim has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1353

Change subject: ASTERIXDB-1736: Remove Grace Hash Join and Hybrid Hash Join (not being used)
......................................................................

ASTERIXDB-1736: Remove Grace Hash Join and Hybrid Hash Join (not being used)

 - Removed Grace Hash Join and Hybrid Hash Join that are not currently being used
   since we always use Optimized Hybrid Hash Join.

Change-Id: I16e9e4c73d7851f18a48c2715a6bc5c903b74eba
---
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
D hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
D hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
D hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
D hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java
D hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
D hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
M hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
9 files changed, 5 insertions(+), 2,165 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/53/1353/1

diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 248bc4f..fe82f68 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -46,7 +46,6 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
@@ -56,32 +55,28 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
 
 public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
 
     private final int memSizeInFrames;
     private final int maxInputBuildSizeInFrames;
-    private final int aveRecordsPerFrame;
     private final double fudgeFactor;
 
     private static final Logger LOGGER = Logger.getLogger(HybridHashJoinPOperator.class.getName());
 
     public HybridHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
             List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities,
-            int memSizeInFrames, int maxInputSizeInFrames, int aveRecordsPerFrame, double fudgeFactor) {
+            int memSizeInFrames, int maxInputSizeInFrames, double fudgeFactor) {
         super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
         this.memSizeInFrames = memSizeInFrames;
         this.maxInputBuildSizeInFrames = maxInputSizeInFrames;
-        this.aveRecordsPerFrame = aveRecordsPerFrame;
         this.fudgeFactor = fudgeFactor;
 
         LOGGER.fine("HybridHashJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType="
                 + partitioningType + ", List<LogicalVariable>=" + sideLeftOfEqualities + ", List<LogicalVariable>="
                 + sideRightOfEqualities + ", int memSizeInFrames=" + memSizeInFrames + ", int maxInputSize0InFrames="
-                + maxInputSizeInFrames + ", int aveRecordsPerFrame=" + aveRecordsPerFrame + ", double fudgeFactor="
-                + fudgeFactor + ".");
+                + maxInputSizeInFrames + ", double fudgeFactor=" + fudgeFactor + ".");
     }
 
     @Override
@@ -114,8 +109,6 @@
         int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
         int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
         IVariableTypeEnvironment env = context.getTypeEnvironment(op);
-        IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper
-                .variablesToBinaryHashFunctionFactories(keysLeftBranch, env, context);
         IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(keysLeftBranch,
                 env, context);
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
@@ -135,58 +128,20 @@
                 propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IOperatorDescriptor opDesc;
-        boolean optimizedHashJoin = true;
         for (IBinaryHashFunctionFamily family : hashFunFamilies) {
             if (family == null) {
-                optimizedHashJoin = false;
-                break;
+                throw new AlgebricksException("A hash function should be assigned for each key variable.");
             }
         }
 
-        if (optimizedHashJoin) {
-            opDesc = generateOptimizedHashJoinRuntime(context, inputSchemas, keysLeft, keysRight, hashFunFamilies,
-                    comparatorFactories, predEvaluatorFactory, recDescriptor, spec);
-        } else {
-            opDesc = generateHashJoinRuntime(context, inputSchemas, keysLeft, keysRight, hashFunFactories,
-                    comparatorFactories, predEvaluatorFactory, recDescriptor, spec);
-        }
+        opDesc = generateOptimizedHashJoinRuntime(context, inputSchemas, keysLeft, keysRight, hashFunFamilies,
+                comparatorFactories, predEvaluatorFactory, recDescriptor, spec);
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 
         ILogicalOperator src1 = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src1, 0, op, 0);
         ILogicalOperator src2 = op.getInputs().get(1).getValue();
         builder.contributeGraphEdge(src2, 0, op, 1);
-    }
-
-    private IOperatorDescriptor generateHashJoinRuntime(JobGenContext context, IOperatorSchema[] inputSchemas,
-            int[] keysLeft, int[] keysRight, IBinaryHashFunctionFactory[] hashFunFactories,
-            IBinaryComparatorFactory[] comparatorFactories, IPredicateEvaluatorFactory predEvaluatorFactory,
-            RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec) throws AlgebricksException {
-        IOperatorDescriptor opDesc;
-        try {
-            switch (kind) {
-                case INNER:
-                    opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), maxInputBuildSizeInFrames,
-                            aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight, hashFunFactories,
-                            comparatorFactories, recDescriptor, predEvaluatorFactory, false, null);
-                    break;
-                case LEFT_OUTER:
-                    IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1]
-                            .getSize()];
-                    for (int j = 0; j < nonMatchWriterFactories.length; j++) {
-                        nonMatchWriterFactories[j] = context.getMissingWriterFactory();
-                    }
-                    opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), maxInputBuildSizeInFrames,
-                            aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight, hashFunFactories,
-                            comparatorFactories, recDescriptor, predEvaluatorFactory, true, nonMatchWriterFactories);
-                    break;
-                default:
-                    throw new NotImplementedException();
-            }
-        } catch (HyracksDataException e) {
-            throw new AlgebricksException(e);
-        }
-        return opDesc;
     }
 
     private IOperatorDescriptor generateOptimizedHashJoinRuntime(JobGenContext context, IOperatorSchema[] inputSchemas,
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 3332836..f4c6134 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -24,7 +24,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -95,7 +94,6 @@
         op.setPhysicalOperator(new HybridHashJoinPOperator(op.getJoinKind(), partitioningType, sideLeft, sideRight,
                 context.getPhysicalOptimizationConfig().getMaxFramesForJoin(),
                 context.getPhysicalOptimizationConfig().getMaxFramesForJoinLeftInput(),
-                context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame(),
                 context.getPhysicalOptimizationConfig().getFudgeFactor()));
         if (partitioningType == JoinPartitioningType.BROADCAST) {
             hybridToInMemHashJoin(op, context);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
deleted file mode 100644
index 2f7b1c2..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.join;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-
-public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final int RPARTITION_ACTIVITY_ID = 0;
-    private static final int SPARTITION_ACTIVITY_ID = 1;
-    private static final int JOIN_ACTIVITY_ID = 2;
-
-    private static final long serialVersionUID = 1L;
-    private final int[] keys0;
-    private final int[] keys1;
-    private final int inputsize0;
-    private final int recordsPerFrame;
-    private final int memsize;
-    private final double factor;
-    private final IBinaryHashFunctionFactory[] hashFunctionFactories;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final IPredicateEvaluatorFactory predEvaluatorFactory;
-    private final boolean isLeftOuter;
-    private final IMissingWriterFactory[] nullWriterFactories1;
-
-    public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
-            int recordsPerFrame, double factor, int[] keys0, int[] keys1,
-            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory) {
-        super(spec, 2, 1);
-        this.memsize = memsize;
-        this.inputsize0 = inputsize0;
-        this.recordsPerFrame = recordsPerFrame;
-        this.factor = factor;
-        this.keys0 = keys0;
-        this.keys1 = keys1;
-        this.hashFunctionFactories = hashFunctionFactories;
-        this.comparatorFactories = comparatorFactories;
-        this.predEvaluatorFactory = predEvalFactory;
-        this.isLeftOuter = false;
-        this.nullWriterFactories1 = null;
-        recordDescriptors[0] = recordDescriptor;
-    }
-
-    public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
-            int recordsPerFrame, double factor, int[] keys0, int[] keys1,
-            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1,
-            IPredicateEvaluatorFactory predEvalFactory) {
-        super(spec, 2, 1);
-        this.memsize = memsize;
-        this.inputsize0 = inputsize0;
-        this.recordsPerFrame = recordsPerFrame;
-        this.factor = factor;
-        this.keys0 = keys0;
-        this.keys1 = keys1;
-        this.hashFunctionFactories = hashFunctionFactories;
-        this.comparatorFactories = comparatorFactories;
-        this.predEvaluatorFactory = predEvalFactory;
-        this.isLeftOuter = isLeftOuter;
-        this.nullWriterFactories1 = nullWriterFactories1;
-        recordDescriptors[0] = recordDescriptor;
-    }
-
-    @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        ActivityId rpartAid = new ActivityId(odId, RPARTITION_ACTIVITY_ID);
-        HashPartitionActivityNode rpart = new HashPartitionActivityNode(rpartAid, keys0);
-        ActivityId spartAid = new ActivityId(odId, SPARTITION_ACTIVITY_ID);
-        HashPartitionActivityNode spart = new HashPartitionActivityNode(spartAid, keys1);
-        JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, JOIN_ACTIVITY_ID), rpartAid, spartAid);
-
-        builder.addActivity(this, rpart);
-        builder.addSourceEdge(0, rpart, 0);
-
-        builder.addActivity(this, spart);
-        builder.addSourceEdge(1, spart, 0);
-
-        builder.addActivity(this, join);
-        builder.addBlockingEdge(rpart, spart);
-        builder.addBlockingEdge(spart, join);
-
-        builder.addTargetEdge(0, join, 0);
-    }
-
-    public int getMemorySize() {
-        return memsize;
-    }
-
-    private class HashPartitionActivityNode extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-        private int[] keys;
-
-        public HashPartitionActivityNode(ActivityId id, int[] keys) {
-            super(id);
-            this.keys = keys;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-            return new GraceHashJoinPartitionBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition),
-                    keys, hashFunctionFactories, comparatorFactories,
-                    (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions)),
-                    recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
-        }
-    }
-
-    private class JoinActivityNode extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        private final ActivityId rpartAid;
-
-        private final ActivityId spartAid;
-
-        public JoinActivityNode(ActivityId id, ActivityId rpartAid, ActivityId spartAid) {
-            super(id);
-            this.rpartAid = rpartAid;
-            this.spartAid = spartAid;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
-            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(rpartAid, 0);
-            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(spartAid, 0);
-            int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
-            final IPredicateEvaluator predEvaluator = predEvaluatorFactory == null ? null
-                    : predEvaluatorFactory.createPredicateEvaluator();
-
-            return new GraceHashJoinOperatorNodePushable(ctx,
-                    new TaskId(new ActivityId(getOperatorId(), RPARTITION_ACTIVITY_ID), partition),
-                    new TaskId(new ActivityId(getOperatorId(), SPARTITION_ACTIVITY_ID), partition), recordsPerFrame,
-                    factor, keys0, keys1, hashFunctionFactories, comparatorFactories, nullWriterFactories1, rd1, rd0,
-                    recordDescriptors[0], numPartitions, predEvaluator, isLeftOuter);
-        }
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
deleted file mode 100644
index 2de8e6c..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.join;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.IMissingWriter;
-import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
-import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
-
-class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-    private final IHyracksTaskContext ctx;
-    private final Object state0Id;
-    private final Object state1Id;
-    private final int[] keys0;
-    private final int[] keys1;
-    private final IBinaryHashFunctionFactory[] hashFunctionFactories;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final IMissingWriterFactory[] nonMatchWriterFactories;
-    private final RecordDescriptor rd0;
-    private final RecordDescriptor rd1;
-    private final int recordsPerFrame;
-    private final double factor;
-    private final int numPartitions;
-    private final boolean isLeftOuter;
-    private final IPredicateEvaluator predEvaluator;
-
-    GraceHashJoinOperatorNodePushable(IHyracksTaskContext ctx, Object state0Id, Object state1Id, int recordsPerFrame,
-            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
-            IBinaryComparatorFactory[] comparatorFactories, IMissingWriterFactory[] nullWriterFactories,
-            RecordDescriptor rd1, RecordDescriptor rd0, RecordDescriptor outRecordDescriptor, int numPartitions,
-            IPredicateEvaluator predEval, boolean isLeftOuter) {
-        this.ctx = ctx;
-        this.state0Id = state0Id;
-        this.state1Id = state1Id;
-        this.keys0 = keys0;
-        this.keys1 = keys1;
-        this.hashFunctionFactories = hashFunctionFactories;
-        this.comparatorFactories = comparatorFactories;
-        this.nonMatchWriterFactories = nullWriterFactories;
-        this.rd0 = rd0;
-        this.rd1 = rd1;
-        this.numPartitions = numPartitions;
-        this.recordsPerFrame = recordsPerFrame;
-        this.factor = factor;
-        this.predEvaluator = predEval;
-        this.isLeftOuter = isLeftOuter;
-    }
-
-    @Override
-    public void initialize() throws HyracksDataException {
-        GraceHashJoinPartitionState rState = (GraceHashJoinPartitionState) ctx.getStateObject(state0Id);
-        GraceHashJoinPartitionState sState = (GraceHashJoinPartitionState) ctx.getStateObject(state1Id);
-        RunFileWriter[] buildWriters = sState.getRunWriters();
-        RunFileWriter[] probeWriters = rState.getRunWriters();
-
-        IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-        ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
-                new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
-        ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(numPartitions,
-                new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)).createPartitioner();
-
-        final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null;
-        if (isLeftOuter) {
-            for (int i = 0; i < nonMatchWriterFactories.length; i++) {
-                nullWriters1[i] = nonMatchWriterFactories[i].createMissingWriter();
-            }
-        }
-        try {
-            writer.open();// open for probe
-            IFrame buffer = new VSizeFrame(ctx);
-            // buffer
-            int tableSize = (int) (numPartitions * recordsPerFrame * factor);
-            ISerializableTable table = new SerializableHashTable(tableSize, ctx);
-
-            for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
-                RunFileWriter buildWriter = buildWriters[partitionid];
-                RunFileWriter probeWriter = probeWriters[partitionid];
-                if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
-                    continue;
-                }
-                table.reset();
-                InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpcRep0,
-                        new FrameTupleAccessor(rd1), hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators),
-                        isLeftOuter, nullWriters1, table, predEvaluator);
-
-                // build
-                if (buildWriter != null) {
-                    RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
-                    buildReader.open();
-                    while (buildReader.nextFrame(buffer)) {
-                        ByteBuffer copyBuffer = ctx.allocateFrame(buffer.getFrameSize());
-                        FrameUtils.copyAndFlip(buffer.getBuffer(), copyBuffer);
-                        joiner.build(copyBuffer);
-                        buffer.reset();
-                    }
-                    buildReader.close();
-                }
-
-                // probe
-                RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
-                probeReader.open();
-                while (probeReader.nextFrame(buffer)) {
-                    joiner.join(buffer.getBuffer(), writer);
-                    buffer.reset();
-                }
-                probeReader.close();
-                joiner.closeJoin(writer);
-            }
-        } catch (Throwable th) {
-            writer.fail();
-            throw new HyracksDataException(th);
-        } finally {
-            writer.close();
-        }
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
deleted file mode 100644
index 5a5543b..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.join;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-
-class GraceHashJoinPartitionBuildOperatorNodePushable extends
-        AbstractUnaryInputSinkOperatorNodePushable {
-    private final IHyracksTaskContext ctx;
-    private final Object stateId;
-    private final int numPartitions;
-    private final IBinaryComparator[] comparators;
-    private final FrameTupleAccessor accessor0;
-    private final ITuplePartitionComputer hpc;
-    private final FrameTupleAppender appender;
-    private IFrame[] outbufs;
-    private GraceHashJoinPartitionState state;
-
-    GraceHashJoinPartitionBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int[] keys,
-            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            int numPartitions, RecordDescriptor inRecordDescriptor) {
-        this.ctx = ctx;
-        this.stateId = stateId;
-        this.numPartitions = numPartitions;
-        accessor0 = new FrameTupleAccessor(inRecordDescriptor);
-        appender = new FrameTupleAppender();
-        hpc = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories).createPartitioner();
-        comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        for (int i = 0; i < numPartitions; i++) {
-            ByteBuffer head = outbufs[i].getBuffer();
-            accessor0.reset(head);
-            if (accessor0.getTupleCount() > 0) {
-                write(i, head);
-            }
-            closeWriter(i);
-        }
-
-        ctx.setStateObject(state);
-    }
-
-    private void closeWriter(int i) throws HyracksDataException {
-        RunFileWriter writer = state.getRunWriters()[i];
-        if (writer != null) {
-            writer.close();
-        }
-    }
-
-    private void write(int i, ByteBuffer head) throws HyracksDataException {
-        RunFileWriter writer = state.getRunWriters()[i];
-        if (writer == null) {
-            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                    GraceHashJoinOperatorDescriptor.class.getSimpleName());
-            writer = new RunFileWriter(file, ctx.getIOManager());
-            writer.open();
-            state.getRunWriters()[i] = writer;
-        }
-        writer.nextFrame(head);
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        accessor0.reset(buffer);
-        int tCount = accessor0.getTupleCount();
-        for (int i = 0; i < tCount; ++i) {
-
-            int entry = hpc.partition(accessor0, i, numPartitions);
-            IFrame outbuf = outbufs[entry];
-            appender.reset(outbuf, false);
-            if (!appender.append(accessor0, i)) {
-                // buffer is full, ie. we cannot fit the tuple
-                // into the buffer -- write it to disk
-                write(entry, outbuf.getBuffer());
-                outbuf.reset();
-                appender.reset(outbuf, true);
-                if (!appender.append(accessor0, i)) {
-                    throw new HyracksDataException("Item too big to fit in frame");
-                }
-            }
-        }
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        state = new GraceHashJoinPartitionState(ctx.getJobletContext().getJobId(), stateId);
-        outbufs = new IFrame[numPartitions];
-        state.setRunWriters(new RunFileWriter[numPartitions]);
-        for (int i = 0; i < numPartitions; i++) {
-            outbufs[i] = new VSizeFrame(ctx);
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java
deleted file mode 100644
index a970a6c..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.join;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-
-public class GraceHashJoinPartitionState extends AbstractStateObject {
-    private RunFileWriter[] fWriters;
-
-    public GraceHashJoinPartitionState(JobId jobId, Object id) {
-        super(jobId, id);
-    }
-
-    public RunFileWriter[] getRunWriters() {
-        return fWriters;
-    }
-
-    public void setRunWriters(RunFileWriter[] fWriters) {
-        this.fWriters = fWriters;
-    }
-
-    @Override
-    public void toBytes(DataOutput out) throws IOException {
-
-    }
-
-    @Override
-    public void fromBytes(DataInput in) throws IOException {
-
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
deleted file mode 100644
index 4354367..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ /dev/null
@@ -1,563 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.join;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.IMissingWriter;
-import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
-import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
-import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
-
-public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
-    private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
-
-    private final int memsize;
-    private static final long serialVersionUID = 1L;
-    private final int inputsize0;
-    private final double factor;
-    private final int recordsPerFrame;
-    private final int[] keys0;
-    private final int[] keys1;
-    private final IBinaryHashFunctionFactory[] hashFunctionFactories;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final IPredicateEvaluatorFactory predEvaluatorFactory;
-    private final boolean isLeftOuter;
-    private final IMissingWriterFactory[] nonMatchWriterFactories1;
-
-    /**
-     * @param spec
-     * @param memsize
-     *            in frames
-     * @param inputsize0
-     *            in frames
-     * @param recordsPerFrame
-     * @param factor
-     * @param keys0
-     * @param keys1
-     * @param hashFunctionFactories
-     * @param comparatorFactories
-     * @param recordDescriptor
-     * @throws HyracksDataException
-     */
-    public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
-            int recordsPerFrame, double factor, int[] keys0, int[] keys1,
-            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter,
-            IMissingWriterFactory[] nullWriterFactories1) throws HyracksDataException {
-        super(spec, 2, 1);
-        this.memsize = memsize;
-        this.inputsize0 = inputsize0;
-        this.factor = factor;
-        this.recordsPerFrame = recordsPerFrame;
-        this.keys0 = keys0;
-        this.keys1 = keys1;
-        this.hashFunctionFactories = hashFunctionFactories;
-        this.comparatorFactories = comparatorFactories;
-        this.predEvaluatorFactory = predEvalFactory;
-        this.isLeftOuter = isLeftOuter;
-        this.nonMatchWriterFactories1 = nullWriterFactories1;
-        recordDescriptors[0] = recordDescriptor;
-    }
-
-    @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        ActivityId p1Aid = new ActivityId(odId, BUILD_AND_PARTITION_ACTIVITY_ID);
-        ActivityId p2Aid = new ActivityId(odId, PARTITION_AND_JOIN_ACTIVITY_ID);
-        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(p1Aid, p2Aid);
-        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(p2Aid, p1Aid);
-
-        builder.addActivity(this, phase1);
-        builder.addSourceEdge(1, phase1, 0);
-
-        builder.addActivity(this, phase2);
-        builder.addSourceEdge(0, phase2, 0);
-
-        builder.addBlockingEdge(phase1, phase2);
-
-        builder.addTargetEdge(0, phase2, 0);
-    }
-
-    public static class BuildAndPartitionTaskState extends AbstractStateObject {
-        private RunFileWriter[] fWriters;
-        private InMemoryHashJoin joiner;
-        private int nPartitions;
-        private int memoryForHashtable;
-
-        public BuildAndPartitionTaskState() {
-        }
-
-        private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) {
-            super(jobId, taskId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
-
-    }
-
-    private class BuildAndPartitionActivityNode extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        private final ActivityId joinAid;
-
-        public BuildAndPartitionActivityNode(ActivityId id, ActivityId joinAid) {
-            super(id);
-            this.joinAid = joinAid;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-                throws HyracksDataException {
-            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(joinAid, 0);
-            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
-            }
-            final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nonMatchWriterFactories1.length]
-                    : null;
-            if (isLeftOuter) {
-                for (int i = 0; i < nonMatchWriterFactories1.length; i++) {
-                    nullWriters1[i] = nonMatchWriterFactories1[i].createMissingWriter();
-                }
-            }
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
-                    : predEvaluatorFactory.createPredicateEvaluator());
-
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
-                        ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
-                private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(rd1);
-                private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
-                        hashFunctionFactories).createPartitioner();
-                private final FrameTupleAppender appender = new FrameTupleAppender();
-                private final FrameTupleAppender ftappender = new FrameTupleAppender();
-                private IFrame[] bufferForPartitions;
-                private final IFrame inBuffer = new VSizeFrame(ctx);
-
-                @Override
-                public void close() throws HyracksDataException {
-                    if (state.memoryForHashtable != 0) {
-                        build(inBuffer.getBuffer());
-                    }
-
-                    for (int i = 0; i < state.nPartitions; i++) {
-                        ByteBuffer buf = bufferForPartitions[i].getBuffer();
-                        accessorBuild.reset(buf);
-                        if (accessorBuild.getTupleCount() > 0) {
-                            write(i, buf);
-                        }
-                        closeWriter(i);
-                    }
-
-                    ctx.setStateObject(state);
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-
-                    if (state.memoryForHashtable != memsize - 2) {
-                        accessorBuild.reset(buffer);
-                        int tCount = accessorBuild.getTupleCount();
-                        for (int i = 0; i < tCount; ++i) {
-                            int entry;
-                            if (state.memoryForHashtable == 0) {
-                                entry = hpcBuild.partition(accessorBuild, i, state.nPartitions);
-                                boolean newBuffer = false;
-                                IFrame bufBi = bufferForPartitions[entry];
-                                while (true) {
-                                    appender.reset(bufBi, newBuffer);
-                                    if (appender.append(accessorBuild, i)) {
-                                        break;
-                                    } else {
-                                        write(entry, bufBi.getBuffer());
-                                        bufBi.reset();
-                                        newBuffer = true;
-                                    }
-                                }
-                            } else {
-                                entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
-                                if (entry < state.memoryForHashtable) {
-                                    while (true) {
-                                        if (!ftappender.append(accessorBuild, i)) {
-                                            build(inBuffer.getBuffer());
-
-                                            ftappender.reset(inBuffer, true);
-                                        } else {
-                                            break;
-                                        }
-                                    }
-                                } else {
-                                    entry %= state.nPartitions;
-                                    boolean newBuffer = false;
-                                    IFrame bufBi = bufferForPartitions[entry];
-                                    while (true) {
-                                        appender.reset(bufBi, newBuffer);
-                                        if (appender.append(accessorBuild, i)) {
-                                            break;
-                                        } else {
-                                            write(entry, bufBi.getBuffer());
-                                            bufBi.reset();
-                                            newBuffer = true;
-                                        }
-                                    }
-                                }
-                            }
-
-                        }
-                    } else {
-                        build(buffer);
-                    }
-
-                }
-
-                private void build(ByteBuffer inBuffer) throws HyracksDataException {
-                    ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.capacity());
-                    FrameUtils.copyAndFlip(inBuffer, copyBuffer);
-                    state.joiner.build(copyBuffer);
-                }
-
-                @Override
-                public void open() throws HyracksDataException {
-                    if (memsize > 1) {
-                        if (memsize > inputsize0) {
-                            state.nPartitions = 0;
-                        } else {
-                            state.nPartitions = (int) (Math
-                                    .ceil((inputsize0 * factor / nPartitions - memsize) / (memsize - 1)));
-                        }
-                        if (state.nPartitions <= 0) {
-                            // becomes in-memory HJ
-                            state.memoryForHashtable = memsize - 2;
-                            state.nPartitions = 0;
-                        } else {
-                            state.memoryForHashtable = memsize - state.nPartitions - 2;
-                            if (state.memoryForHashtable < 0) {
-                                state.memoryForHashtable = 0;
-                                state.nPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
-                            }
-                        }
-                    } else {
-                        throw new HyracksDataException("not enough memory");
-                    }
-
-                    ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
-                            .createPartitioner();
-                    ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
-                            .createPartitioner();
-                    int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
-                    ISerializableTable table = new SerializableHashTable(tableSize, ctx);
-                    state.joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpc0,
-                            new FrameTupleAccessor(rd1), hpc1, new FrameTuplePairComparator(keys0, keys1, comparators),
-                            isLeftOuter, nullWriters1, table, predEvaluator);
-                    bufferForPartitions = new IFrame[state.nPartitions];
-                    state.fWriters = new RunFileWriter[state.nPartitions];
-                    for (int i = 0; i < state.nPartitions; i++) {
-                        bufferForPartitions[i] = new VSizeFrame(ctx);
-                    }
-
-                    ftappender.reset(inBuffer, true);
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                }
-
-                private void closeWriter(int i) throws HyracksDataException {
-                    RunFileWriter writer = state.fWriters[i];
-                    if (writer != null) {
-                        writer.close();
-                    }
-                }
-
-                private void write(int i, ByteBuffer head) throws HyracksDataException {
-                    RunFileWriter writer = state.fWriters[i];
-                    if (writer == null) {
-                        FileReference file = ctx.getJobletContext()
-                                .createManagedWorkspaceFile(BuildAndPartitionActivityNode.class.getSimpleName());
-                        writer = new RunFileWriter(file, ctx.getIOManager());
-                        writer.open();
-                        state.fWriters[i] = writer;
-                    }
-                    writer.nextFrame(head);
-                }
-            };
-            return op;
-        }
-    }
-
-    private class PartitionAndJoinActivityNode extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        private final ActivityId buildAid;
-
-        public PartitionAndJoinActivityNode(ActivityId id, ActivityId buildAid) {
-            super(id);
-            this.buildAid = buildAid;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-                throws HyracksDataException {
-            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
-            }
-            final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nonMatchWriterFactories1.length]
-                    : null;
-            if (isLeftOuter) {
-                for (int i = 0; i < nonMatchWriterFactories1.length; i++) {
-                    nullWriters1[i] = nonMatchWriterFactories1[i].createMissingWriter();
-                }
-            }
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
-                    : predEvaluatorFactory.createPredicateEvaluator());
-
-            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-                private BuildAndPartitionTaskState state;
-                private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(rd0);
-                private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
-                        hashFunctionFactories);
-                private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
-                        hashFunctionFactories);
-                private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();
-
-                private final FrameTupleAppender appender = new FrameTupleAppender();
-                private final FrameTupleAppender ftap = new FrameTupleAppender();
-                private final IFrame inBuffer = new VSizeFrame(ctx);
-                private final IFrame outBuffer = new VSizeFrame(ctx);
-                private RunFileWriter[] buildWriters;
-                private RunFileWriter[] probeWriters;
-                private IFrame[] bufferForPartitions;
-
-                @Override
-                public void open() throws HyracksDataException {
-                    writer.open();
-                    state = (BuildAndPartitionTaskState) ctx.getStateObject(
-                            new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
-                    buildWriters = state.fWriters;
-                    probeWriters = new RunFileWriter[state.nPartitions];
-                    bufferForPartitions = new IFrame[state.nPartitions];
-                    for (int i = 0; i < state.nPartitions; i++) {
-                        bufferForPartitions[i] = new VSizeFrame(ctx);
-                    }
-                    appender.reset(outBuffer, true);
-                    ftap.reset(inBuffer, true);
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    if (state.memoryForHashtable != memsize - 2) {
-                        accessorProbe.reset(buffer);
-                        int tupleCount0 = accessorProbe.getTupleCount();
-                        for (int i = 0; i < tupleCount0; ++i) {
-
-                            int entry;
-                            if (state.memoryForHashtable == 0) {
-                                entry = hpcProbe.partition(accessorProbe, i, state.nPartitions);
-                                boolean newBuffer = false;
-                                IFrame outbuf = bufferForPartitions[entry];
-                                while (true) {
-                                    appender.reset(outbuf, newBuffer);
-                                    if (appender.append(accessorProbe, i)) {
-                                        break;
-                                    } else {
-                                        write(entry, outbuf.getBuffer());
-                                        outbuf.reset();
-                                        newBuffer = true;
-                                    }
-                                }
-                            } else {
-                                entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
-                                if (entry < state.memoryForHashtable) {
-                                    while (true) {
-                                        if (!ftap.append(accessorProbe, i)) {
-                                            state.joiner.join(inBuffer.getBuffer(), writer);
-                                            ftap.reset(inBuffer, true);
-                                        } else {
-                                            break;
-                                        }
-                                    }
-
-                                } else {
-                                    entry %= state.nPartitions;
-                                    boolean newBuffer = false;
-                                    IFrame outbuf = bufferForPartitions[entry];
-                                    while (true) {
-                                        appender.reset(outbuf, newBuffer);
-                                        if (appender.append(accessorProbe, i)) {
-                                            break;
-                                        } else {
-                                            write(entry, outbuf.getBuffer());
-                                            outbuf.reset();
-                                            newBuffer = true;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    } else {
-                        state.joiner.join(buffer, writer);
-                    }
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    try {
-                        state.joiner.join(inBuffer.getBuffer(), writer);
-                        state.joiner.closeJoin(writer);
-                        ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
-                                .createPartitioner();
-                        ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
-                                .createPartitioner();
-                        if (state.memoryForHashtable != memsize - 2) {
-                            for (int i = 0; i < state.nPartitions; i++) {
-                                ByteBuffer buf = bufferForPartitions[i].getBuffer();
-                                accessorProbe.reset(buf);
-                                if (accessorProbe.getTupleCount() > 0) {
-                                    write(i, buf);
-                                }
-                                closeWriter(i);
-                            }
-
-                            inBuffer.reset();
-                            int tableSize = -1;
-                            if (state.memoryForHashtable == 0) {
-                                tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
-                            } else {
-                                tableSize = (int) (memsize * recordsPerFrame * factor);
-                            }
-                            ISerializableTable table = new SerializableHashTable(tableSize, ctx);
-                            for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
-                                RunFileWriter buildWriter = buildWriters[partitionid];
-                                RunFileWriter probeWriter = probeWriters[partitionid];
-                                if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
-                                    continue;
-                                }
-                                table.reset();
-                                InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize,
-                                        new FrameTupleAccessor(rd0), hpcRep0, new FrameTupleAccessor(rd1), hpcRep1,
-                                        new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
-                                        nullWriters1, table, predEvaluator);
-
-                                if (buildWriter != null) {
-                                    RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
-                                    buildReader.open();
-                                    while (buildReader.nextFrame(inBuffer)) {
-                                        ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
-                                        FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
-                                        joiner.build(copyBuffer);
-                                        inBuffer.reset();
-                                    }
-                                    buildReader.close();
-                                }
-
-                                // probe
-                                RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
-                                probeReader.open();
-                                while (probeReader.nextFrame(inBuffer)) {
-                                    joiner.join(inBuffer.getBuffer(), writer);
-                                    inBuffer.reset();
-                                }
-                                probeReader.close();
-                                joiner.closeJoin(writer);
-                            }
-                        }
-                    } finally {
-                        writer.close();
-                    }
-                }
-
-                private void closeWriter(int i) throws HyracksDataException {
-                    RunFileWriter writer = probeWriters[i];
-                    if (writer != null) {
-                        writer.close();
-                    }
-                }
-
-                private void write(int i, ByteBuffer head) throws HyracksDataException {
-                    RunFileWriter writer = probeWriters[i];
-                    if (writer == null) {
-                        FileReference file = ctx
-                                .createManagedWorkspaceFile(PartitionAndJoinActivityNode.class.getSimpleName());
-                        writer = new RunFileWriter(file, ctx.getIOManager());
-                        writer.open();
-                        probeWriters[i] = writer;
-                    }
-                    writer.nextFrame(head);
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    writer.fail();
-                }
-            };
-            return op;
-        }
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
deleted file mode 100644
index 43ef74d..0000000
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ /dev/null
@@ -1,1032 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.tests.integration;
-
-import java.io.File;
-
-import org.junit.Test;
-
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.MToNBroadcastConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
-import org.apache.hyracks.tests.util.NoopMissingWriterFactory;
-import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
-
-public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
-
-    /*
-     * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL,
-     * C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY
-     * INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT
-     * NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL );
-     * TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL,
-     * O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE
-     * DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY
-     * CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT
-     * NULL, O_COMMENT VARCHAR(79) NOT NULL );
-     */
-
-    @Test
-    public void customerOrderCIDJoin() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/tpch0.001/customer.tbl"))) };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                "data/tpch0.001/orders.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
-
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
-                spec,
-                new int[] { 1 },
-                new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128, null);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
-
-        ResultSetId rsId = new ResultSetId(1);
-        spec.addResultSetId(rsId);
-
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
-        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(custJoinConn, custScanner, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void customerOrderCIDGraceJoin() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/tpch0.001/customer.tbl"))) };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                "data/tpch0.001/orders.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
-
-        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(
-                spec,
-                4,
-                10,
-                200,
-                1.2,
-                new int[] { 1 },
-                new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, null);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
-
-        ResultSetId rsId = new ResultSetId(1);
-        spec.addResultSetId(rsId);
-
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
-        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(custJoinConn, custScanner, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void customerOrderCIDHybridHashJoin() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/tpch0.001/customer.tbl"))) };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                "data/tpch0.001/orders.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
-
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(
-                spec,
-                5,
-                20,
-                200,
-                1.2,
-                new int[] { 1 },
-                new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, null, false, null);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
-
-        ResultSetId rsId = new ResultSetId(1);
-        spec.addResultSetId(rsId);
-
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
-        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(custJoinConn, custScanner, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void customerOrderCIDInMemoryHashLeftOuterJoin() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/tpch0.001/customer.tbl"))) };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                "data/tpch0.001/orders.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
-
-        IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()];
-        for (int j = 0; j < nonMatchWriterFactories.length; j++) {
-            nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE;
-        }
-
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
-                spec,
-                new int[] { 0 },
-                new int[] { 1 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                null, custOrderJoinDesc, true, nonMatchWriterFactories, 128);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
-
-        ResultSetId rsId = new ResultSetId(1);
-        spec.addResultSetId(rsId);
-
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
-        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(custJoinConn, custScanner, 0, join, 0);
-
-        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void customerOrderCIDGraceHashLeftOuterJoin() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/tpch0.001/customer.tbl"))) };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                "data/tpch0.001/orders.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
-
-        IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()];
-        for (int j = 0; j < nonMatchWriterFactories.length; j++) {
-            nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE;
-        }
-
-        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(
-                spec,
-                5,
-                20,
-                200,
-                1.2,
-                new int[] { 0 },
-                new int[] { 1 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, true, nonMatchWriterFactories, null);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
-
-        ResultSetId rsId = new ResultSetId(1);
-        spec.addResultSetId(rsId);
-
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
-        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(custJoinConn, custScanner, 0, join, 0);
-
-        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void customerOrderCIDHybridHashLeftOuterJoin() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/tpch0.001/customer.tbl"))) };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                "data/tpch0.001/orders.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
-
-        IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()];
-        for (int j = 0; j < nonMatchWriterFactories.length; j++) {
-            nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE;
-        }
-
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(
-                spec,
-                5,
-                20,
-                200,
-                1.2,
-                new int[] { 0 },
-                new int[] { 1 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, null, true, nonMatchWriterFactories);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
-
-        ResultSetId rsId = new ResultSetId(1);
-        spec.addResultSetId(rsId);
-
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
-        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(custJoinConn, custScanner, 0, join, 0);
-
-        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void customerOrderCIDJoinMulti() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
-                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        FileSplit[] ordersSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
-                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
-
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
-                spec,
-                new int[] { 1 },
-                new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128, null);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
-
-        ResultSetId rsId = new ResultSetId(1);
-        spec.addResultSetId(rsId);
-
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
-        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
-        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
-        spec.connect(custJoinConn, custScanner, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void customerOrderCIDGraceJoinMulti() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
-                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        FileSplit[] ordersSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
-                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
-
-        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(
-                spec,
-                3,
-                20,
-                100,
-                1.2,
-                new int[] { 1 },
-                new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, null);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
-
-        ResultSetId rsId = new ResultSetId(1);
-        spec.addResultSetId(rsId);
-
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
-        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
-        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
-        spec.connect(custJoinConn, custScanner, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void customerOrderCIDHybridHashJoinMulti() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
-                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        FileSplit[] ordersSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
-                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
-
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(
-                spec,
-                3,
-                20,
-                100,
-                1.2,
-                new int[] { 1 },
-                new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, null, false, null);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
-
-        ResultSetId rsId = new ResultSetId(1);
-        spec.addResultSetId(rsId);
-
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
-        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
-        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
-        spec.connect(custJoinConn, custScanner, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void customerOrderCIDJoinAutoExpand() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
-                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        FileSplit[] ordersSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
-                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
-
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
-                spec,
-                new int[] { 1 },
-                new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128, null);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
-
-        ResultSetId rsId = new ResultSetId(1);
-        spec.addResultSetId(rsId);
-
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
-        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
-        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
-        spec.connect(custJoinConn, custScanner, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void customerOrderCIDJoinMultiMaterialized() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
-                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        FileSplit[] ordersSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
-                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
-
-        MaterializingOperatorDescriptor ordMat = new MaterializingOperatorDescriptor(spec, ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordMat, NC1_ID, NC2_ID);
-
-        MaterializingOperatorDescriptor custMat = new MaterializingOperatorDescriptor(spec, custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custMat, NC1_ID, NC2_ID);
-
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
-                spec,
-                new int[] { 1 },
-                new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128, null);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
-
-        ResultSetId rsId = new ResultSetId(1);
-        spec.addResultSetId(rsId);
-
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor ordPartConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
-        spec.connect(ordPartConn, ordScanner, 0, ordMat, 0);
-
-        IConnectorDescriptor custPartConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
-        spec.connect(custPartConn, custScanner, 0, custMat, 0);
-
-        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(ordJoinConn, ordMat, 0, join, 0);
-
-        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(custJoinConn, custMat, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index 507e1c7..0d9a6b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -61,7 +61,6 @@
 import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
 import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
 import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
@@ -195,14 +194,6 @@
                     new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
                     new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
                     null);
-
-        } else if ("grace".equalsIgnoreCase(algo)) {
-            join = new GraceHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceRecordsPerFrame, graceFactor,
-                    new int[] { 0 }, new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] {
-                            PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    Common.custOrderJoinDesc, null);
 
         } else {
             System.err.println("unknown algorithm:" + algo);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1353
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I16e9e4c73d7851f18a48c2715a6bc5c903b74eba
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Taewoo Kim <wangsaeu@yahoo.com>


Mime
View raw message