asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Preston Carman (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: Range connector updates to support interval partitioning.
Date Fri, 15 Apr 2016 01:08:45 GMT
Preston Carman has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/803

Change subject: Range connector updates to support interval partitioning.
......................................................................

Range connector updates to support interval partitioning.

The range connector can now replicate data based on the partitioning properties.
Also a few unit tests have been added to ensure the correct range partitioning.

Change-Id: I551e3196d8a101cf94c084c14842aa1af11632ce
---
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
M asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric-desc.plan
M asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric.plan
M asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string-desc.plan
M asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string.plan
M asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_01/order-by-exception_01.4.query.aql
M asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_02/order-by-exception_02.4.query.aql
M asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.4.query.aql
M asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.5.query.aql
M asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.6.query.aql
M asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.7.query.aql
M asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java
M asterixdb/asterix-om/pom.xml
M asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscRangeBinaryComparatorFactory.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectDescRangeBinaryComparatorFactory.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/IntervalAscRangeBinaryComparatorFactory.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/IntervalDescRangeBinaryComparatorFactory.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/AIntervalTypeComputer.java
A asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/nontagged/comparators/AqlBinaryComparatorFactoryTest.java
A asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/IntervalRangePartitionComputerFactoryTest.java
A asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalBinaryComparatorFactoryTest.java
M asterixdb/pom.xml
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
R hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastExchangePOperator.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
R hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionExchangePOperator.java
C hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
R hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
D hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
M hyracks-fullstack/algebricks/algebricks-data/pom.xml
M hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IBinaryRangeComparatorFactory.java
A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputer.java
A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java
A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFamily.java
A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/storage/IGrowableIntArray.java
M hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/accessors/PointableBinaryComparatorFactory.java
A hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/accessors/PointableBinaryRangeAscComparatorFactory.java
A hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/accessors/PointableBinaryRangeDescComparatorFactory.java
M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java
M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/AbstractPartitionDataWriter.java
A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java
A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
A hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java
M hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java
M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/TreeIndexBufferCacheWarmup.java
M hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
M hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/PathList.java
M hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/HilbertDoubleComparator.java
M hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/ZCurveDoubleComparator.java
M hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/ZCurveIntComparator.java
R hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/arraylist/DoubleArrayList.java
R hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/arraylist/IntArrayList.java
R hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/arraylist/LongArrayList.java
71 files changed, 2,706 insertions(+), 544 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/03/803/1

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index fb81885..4f5a848 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -34,7 +34,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -77,7 +77,7 @@
             }
         };
 
-        exchangeOp.setPhysicalOperator(new RandomPartitionPOperator(domain));
+        exchangeOp.setPhysicalOperator(new RandomPartitionExchangePOperator(domain));
         op.getInputs().get(0).setValue(exchangeOp);
         exchangeOp.getInputs().add(new MutableObject<ILogicalOperator>(scanOp));
         ExecutionMode em = ((AbstractLogicalOperator) scanOp).getExecutionMode();
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric-desc.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric-desc.plan
index cdedfde..a13a9f3 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric-desc.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric-desc.plan
@@ -1,7 +1,7 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
-      -- RANGE_PARTITION_MERGE_EXCHANGE [$$4(DESC)] SPLIT COUNT:3  |PARTITIONED|
+      -- RANGE_PARTITION_MERGE_EXCHANGE [$$4(DESC)] PROJECT  |PARTITIONED|
         -- STABLE_SORT [$$4(DESC)]  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
             -- ASSIGN  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric.plan
index ba0dc6f..448d887 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric.plan
@@ -1,7 +1,7 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
-      -- RANGE_PARTITION_MERGE_EXCHANGE [$$4(ASC)] SPLIT COUNT:3  |PARTITIONED|
+      -- RANGE_PARTITION_MERGE_EXCHANGE [$$4(ASC)] PROJECT  |PARTITIONED|
         -- STABLE_SORT [$$4(ASC)]  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
             -- ASSIGN  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string-desc.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string-desc.plan
index 3faa5ec..ac3bd0e 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string-desc.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string-desc.plan
@@ -1,7 +1,7 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
-      -- RANGE_PARTITION_MERGE_EXCHANGE [$$3(DESC)] SPLIT COUNT:3  |PARTITIONED|
+      -- RANGE_PARTITION_MERGE_EXCHANGE [$$3(DESC)] PROJECT  |PARTITIONED|
         -- STABLE_SORT [$$3(DESC)]  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
             -- DATASOURCE_SCAN  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string.plan
index e0cffaa..0b61105 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string.plan
@@ -1,7 +1,7 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
-      -- RANGE_PARTITION_MERGE_EXCHANGE [$$3(ASC)] SPLIT COUNT:3  |PARTITIONED|
+      -- RANGE_PARTITION_MERGE_EXCHANGE [$$3(ASC)] PROJECT  |PARTITIONED|
         -- DATASOURCE_SCAN  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
             -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_01/order-by-exception_01.4.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_01/order-by-exception_01.4.query.aql
index 8667a4c..3f89553 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_01/order-by-exception_01.4.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_01/order-by-exception_01.4.query.aql
@@ -19,6 +19,6 @@
 use dataverse TinySocial;
 
 for $user in dataset TwitterUsers
-/*+ range ["Ci", "Nb", "F"] */
+/*+ range ["Aa", "Ci", "Nb", "F", "Zb"] */
 order by $user.screen-name
 return $user;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_02/order-by-exception_02.4.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_02/order-by-exception_02.4.query.aql
index 02f55c6..9520dd0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_02/order-by-exception_02.4.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_02/order-by-exception_02.4.query.aql
@@ -19,6 +19,6 @@
 use dataverse TinySocial;
 
 for $user in dataset TwitterUsers
-/*+ range ["Ci", "Nb", "F"] */
+/*+ range ["Aa", "Ci", "Nb", "F", "Zb"] */
 order by $user.screen-name desc
 return $user;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.4.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.4.query.aql
index ac6f643..1fffdc2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.4.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.4.query.aql
@@ -19,6 +19,6 @@
 use dataverse TinySocial;
 
 for $user in dataset TwitterUsers
-/*+ range ["Ci", "F", "Nb"] */
+/*+ range ["Aa", "Ci", "F", "Nb", "Zz"] */
 order by $user.screen-name
 return $user;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.5.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.5.query.aql
index 10f1ba1..e6dcfbe 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.5.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.5.query.aql
@@ -19,6 +19,6 @@
 use dataverse TinySocial;
 
 for $user in dataset TwitterUsers
-/*+ range ["Nb", "F", "Ci"] */
+/*+ range ["Zz", "Nb", "F", "Ci", "Aa"] */
 order by $user.screen-name desc
 return $user;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.6.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.6.query.aql
index 628a118..88bad1a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.6.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.6.query.aql
@@ -19,6 +19,6 @@
 use dataverse TinySocial;
 
 for $user in dataset TwitterUsers
-/*+ range [100, 150, 400] */
+/*+ range [0, 100, 150, 400, 500] */
 order by $user.friends_count
 return $user;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.7.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.7.query.aql
index d478451..1fa00ea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.7.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by/order-by.7.query.aql
@@ -19,6 +19,6 @@
 use dataverse TinySocial;
 
 for $user in dataset TwitterUsers
-/*+ range [400, 150, 100] */
+/*+ range [500, 400, 150, 100, 0] */
 order by $user.friends_count desc
 return $user;
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java
index 2fc4b14..e68adb1 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java
@@ -65,7 +65,9 @@
 
         IParser parser = parserFactory.createParser((String) hint);
         List<Statement> hintStatements = parser.parse();
-        if (hintStatements.size() != 1) {
+        if (hintStatements.size() == 0) {
+            throw new AsterixException("No range hint was supplied to the RangeMapBuilder.");
+        } else if (hintStatements.size() != 1) {
             throw new AsterixException("Only one range statement is allowed for the range hint.");
         }
 
diff --git a/asterixdb/asterix-om/pom.xml b/asterixdb/asterix-om/pom.xml
index 48e4ffd..23e98de 100644
--- a/asterixdb/asterix-om/pom.xml
+++ b/asterixdb/asterix-om/pom.xml
@@ -16,58 +16,67 @@
  ! specific language governing permissions and limitations
  ! under the License.
  !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<artifactId>apache-asterixdb</artifactId>
-		<groupId>org.apache.asterix</groupId>
-		<version>0.8.9-SNAPSHOT</version>
-	</parent>
-	<artifactId>asterix-om</artifactId>
+<project
+    xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>apache-asterixdb</artifactId>
+        <groupId>org.apache.asterix</groupId>
+        <version>0.8.9-SNAPSHOT</version>
+    </parent>
+    <artifactId>asterix-om</artifactId>
 
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-      <comments>A business-friendly OSS license</comments>
-    </license>
-  </licenses>
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+            <comments>A business-friendly OSS license</comments>
+        </license>
+    </licenses>
 
     <properties>
         <appendedResourcesDirectory>${basedir}/../src/main/appended-resources</appendedResourcesDirectory>
     </properties>
 
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.asterix</groupId>
-			<artifactId>asterix-common</artifactId>
-			<version>0.8.9-SNAPSHOT</version>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.asterix</groupId>
-			<artifactId>asterix-transactions</artifactId>
-			<version>0.8.9-SNAPSHOT</version>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hyracks</groupId>
-			<artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
-		</dependency>
+    <dependencies>
         <dependency>
-			<groupId>org.apache.hyracks</groupId>
-			<artifactId>algebricks-compiler</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hyracks</groupId>
-			<artifactId>hyracks-storage-am-lsm-rtree</artifactId>
-		</dependency>
-		<dependency>
-        <groupId>org.reflections</groupId>
+            <groupId>org.apache.asterix</groupId>
+            <artifactId>asterix-common</artifactId>
+            <version>0.8.9-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.asterix</groupId>
+            <artifactId>asterix-transactions</artifactId>
+            <version>0.8.9-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>algebricks-compiler</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-storage-am-lsm-rtree</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-test-support</artifactId>
+            <type>jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.reflections</groupId>
             <artifactId>reflections</artifactId>
             <version>0.9.10</version>
             <scope>test</scope>
         </dependency>
-	</dependencies>
+    </dependencies>
 </project>
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
index 59e669a..2cb3cde 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
@@ -29,7 +29,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.util.GrowableArray;
-import org.apache.hyracks.storage.am.common.ophelpers.IntArrayList;
+import org.apache.hyracks.storage.common.arraylist.IntArrayList;
 
 public abstract class AbstractListBuilder implements IAsterixListBuilder {
     protected final GrowableArray outputStorage;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscRangeBinaryComparatorFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscRangeBinaryComparatorFactory.java
new file mode 100644
index 0000000..c51c6da
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscRangeBinaryComparatorFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.comparators;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+
+public class AObjectAscRangeBinaryComparatorFactory implements IBinaryRangeComparatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final AObjectAscRangeBinaryComparatorFactory INSTANCE = new AObjectAscRangeBinaryComparatorFactory();
+
+    private AObjectAscRangeBinaryComparatorFactory() {
+    }
+
+    @Override
+    public IBinaryComparator createMinBinaryComparator() {
+        return AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    }
+
+    @Override
+    public IBinaryComparator createMaxBinaryComparator() {
+        return AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectDescRangeBinaryComparatorFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectDescRangeBinaryComparatorFactory.java
new file mode 100644
index 0000000..54199d8
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectDescRangeBinaryComparatorFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.comparators;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+
+public class AObjectDescRangeBinaryComparatorFactory implements IBinaryRangeComparatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final AObjectDescRangeBinaryComparatorFactory INSTANCE = new AObjectDescRangeBinaryComparatorFactory();
+
+    private AObjectDescRangeBinaryComparatorFactory() {
+    }
+
+    @Override
+    public IBinaryComparator createMinBinaryComparator() {
+        return AObjectDescBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    }
+
+    @Override
+    public IBinaryComparator createMaxBinaryComparator() {
+        return AObjectDescBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/IntervalAscRangeBinaryComparatorFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/IntervalAscRangeBinaryComparatorFactory.java
new file mode 100644
index 0000000..121204f
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/IntervalAscRangeBinaryComparatorFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.comparators.rangeinterval;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+
+public class IntervalAscRangeBinaryComparatorFactory implements IBinaryRangeComparatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IBinaryRangeComparatorFactory INSTANCE = new IntervalAscRangeBinaryComparatorFactory();
+
+    private IntervalAscRangeBinaryComparatorFactory() {
+
+    }
+
+    @Override
+    public IBinaryComparator createMinBinaryComparator() {
+        return new IBinaryComparator() {
+
+            @Override
+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                return Long.compare(AIntervalSerializerDeserializer.getIntervalStart(b1, s1),
+                        AInt64SerializerDeserializer.getLong(b2, s2));
+            }
+        };
+    }
+
+    @Override
+    public IBinaryComparator createMaxBinaryComparator() {
+        return new IBinaryComparator() {
+
+            @Override
+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                return Long.compare(AIntervalSerializerDeserializer.getIntervalEnd(b1, s1),
+                        AInt64SerializerDeserializer.getLong(b2, s2));
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/IntervalDescRangeBinaryComparatorFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/IntervalDescRangeBinaryComparatorFactory.java
new file mode 100644
index 0000000..fc14dca
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/IntervalDescRangeBinaryComparatorFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.comparators.rangeinterval;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+
+public class IntervalDescRangeBinaryComparatorFactory implements IBinaryRangeComparatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IBinaryRangeComparatorFactory INSTANCE = new IntervalDescRangeBinaryComparatorFactory();
+
+    private IntervalDescRangeBinaryComparatorFactory() {
+
+    }
+
+    @Override
+    public IBinaryComparator createMinBinaryComparator() {
+        return new IBinaryComparator() {
+
+            @Override
+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                return -Long.compare(AIntervalSerializerDeserializer.getIntervalEnd(b1, s1),
+                        AInt64SerializerDeserializer.getLong(b2, s2));
+            }
+        };
+    }
+
+    @Override
+    public IBinaryComparator createMaxBinaryComparator() {
+        return new IBinaryComparator() {
+
+            @Override
+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                return -Long.compare(AIntervalSerializerDeserializer.getIntervalStart(b1, s1),
+                        AInt64SerializerDeserializer.getLong(b2, s2));
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java
index 44851e3..d17acdd 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java
@@ -27,7 +27,9 @@
 import org.apache.asterix.dataflow.data.nontagged.comparators.AIntervalDescPartialBinaryComparatorFactory;
 import org.apache.asterix.dataflow.data.nontagged.comparators.ALinePartialBinaryComparatorFactory;
 import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
+import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscRangeBinaryComparatorFactory;
 import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectDescBinaryComparatorFactory;
+import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectDescRangeBinaryComparatorFactory;
 import org.apache.asterix.dataflow.data.nontagged.comparators.APoint3DPartialBinaryComparatorFactory;
 import org.apache.asterix.dataflow.data.nontagged.comparators.APointPartialBinaryComparatorFactory;
 import org.apache.asterix.dataflow.data.nontagged.comparators.APolygonPartialBinaryComparatorFactory;
@@ -35,11 +37,14 @@
 import org.apache.asterix.dataflow.data.nontagged.comparators.AUUIDPartialBinaryComparatorFactory;
 import org.apache.asterix.dataflow.data.nontagged.comparators.BooleanBinaryComparatorFactory;
 import org.apache.asterix.dataflow.data.nontagged.comparators.RawBinaryComparatorFactory;
+import org.apache.asterix.dataflow.data.nontagged.comparators.rangeinterval.IntervalAscRangeBinaryComparatorFactory;
+import org.apache.asterix.dataflow.data.nontagged.comparators.rangeinterval.IntervalDescRangeBinaryComparatorFactory;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
@@ -51,6 +56,7 @@
 import org.apache.hyracks.data.std.primitive.RawUTF8StringPointable;
 import org.apache.hyracks.data.std.primitive.ShortPointable;
 import org.apache.hyracks.data.std.primitive.UTF8StringLowercasePointable;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
 
 public class AqlBinaryComparatorFactoryProvider implements IBinaryComparatorFactoryProvider, Serializable {
 
@@ -77,6 +83,23 @@
             ByteArrayPointable.FACTORY);
 
     private AqlBinaryComparatorFactoryProvider() {
+    }
+
+    // This method adds the option of range range
+    public IBinaryRangeComparatorFactory getRangeBinaryComparatorFactory(Object type, boolean ascending,
+            RangePartitioningType rangeType) {
+        if (type == null) {
+            return anyBinaryRangeComparatorFactory(ascending);
+        }
+        IAType aqlType = (IAType) type;
+        switch (aqlType.getTypeTag()) {
+            case INTERVAL: {
+                return addOffsetForRange(getIntervalRangeBinaryComparatorFactory(ascending), ascending);
+            }
+            default: {
+                return anyBinaryRangeComparatorFactory(ascending);
+            }
+        }
     }
 
     // This method add the option of ignoring the case in string comparisons.
@@ -173,7 +196,7 @@
                 return addOffset(ADurationPartialBinaryComparatorFactory.INSTANCE, ascending);
             }
             case INTERVAL: {
-                return addOffset(intervalBinaryComparatorFactory(ascending), ascending);
+                return addOffset(getIntervalBinaryComparatorFactory(ascending), ascending);
             }
             case UUID: {
                 return addOffset(AUUIDPartialBinaryComparatorFactory.INSTANCE, ascending);
@@ -197,7 +220,58 @@
                 final IBinaryComparator bc = inst.createBinaryComparator();
                 if (ascending) {
                     return new ABinaryComparator() {
+                        @Override
+                        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
+                                throws HyracksDataException {
+                            return bc.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+                        }
+                    };
+                } else {
+                    return new ABinaryComparator() {
+                        @Override
+                        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
+                                throws HyracksDataException {
+                            return -bc.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+                        }
+                    };
+                }
+            }
+        };
+    }
 
+    private IBinaryRangeComparatorFactory addOffsetForRange(final IBinaryRangeComparatorFactory inst,
+            final boolean ascending) {
+        return new IBinaryRangeComparatorFactory() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IBinaryComparator createMinBinaryComparator() {
+                final IBinaryComparator bc = inst.createMinBinaryComparator();
+                if (ascending) {
+                    return new ABinaryComparator() {
+                        @Override
+                        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
+                                throws HyracksDataException {
+                            return bc.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+                        }
+                    };
+                } else {
+                    return new ABinaryComparator() {
+                        @Override
+                        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
+                                throws HyracksDataException {
+                            return -bc.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+                        }
+                    };
+                }
+            }
+
+            @Override
+            public IBinaryComparator createMaxBinaryComparator() {
+                final IBinaryComparator bc = inst.createMaxBinaryComparator();
+                if (ascending) {
+                    return new ABinaryComparator() {
                         @Override
                         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
                                 throws HyracksDataException {
@@ -225,7 +299,23 @@
         }
     }
 
-    private IBinaryComparatorFactory intervalBinaryComparatorFactory(boolean ascending) {
+    private IBinaryRangeComparatorFactory anyBinaryRangeComparatorFactory(boolean ascending) {
+        if (ascending) {
+            return AObjectAscRangeBinaryComparatorFactory.INSTANCE;
+        } else {
+            return AObjectDescRangeBinaryComparatorFactory.INSTANCE;
+        }
+    }
+
+    private IBinaryRangeComparatorFactory getIntervalRangeBinaryComparatorFactory(boolean ascending) {
+        if (ascending) {
+            return IntervalAscRangeBinaryComparatorFactory.INSTANCE;
+        } else {
+            return IntervalDescRangeBinaryComparatorFactory.INSTANCE;
+        }
+    }
+
+    private IBinaryComparatorFactory getIntervalBinaryComparatorFactory(boolean ascending) {
         // Intervals have separate binary comparator factories, since asc is primarily based on start point
         // and desc is similarly based on end point.
         if (ascending) {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/AIntervalTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/AIntervalTypeComputer.java
index f6e3183..7b2e8f0 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/AIntervalTypeComputer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/AIntervalTypeComputer.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 
 public class AIntervalTypeComputer implements IResultTypeComputer {
-
+    private static final long serialVersionUID = 1L;
     public static final AIntervalTypeComputer INSTANCE = new AIntervalTypeComputer();
 
     private AIntervalTypeComputer() {
diff --git a/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/nontagged/comparators/AqlBinaryComparatorFactoryTest.java b/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/nontagged/comparators/AqlBinaryComparatorFactoryTest.java
new file mode 100644
index 0000000..a6c449f
--- /dev/null
+++ b/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/nontagged/comparators/AqlBinaryComparatorFactoryTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.dataflow.data.nontagged.comparators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AObjectSerializerDeserializer;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AInterval;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class AqlBinaryComparatorFactoryTest extends TestCase {
+
+    @SuppressWarnings("rawtypes")
+    private final ISerializerDeserializer serde = AObjectSerializerDeserializer.INSTANCE;
+
+    /*
+     * The following points (X) will be tested for this interval (+).
+     *
+     * ----X---XXX---X---XXX---X----
+     * ---------+++++++++++---------
+     */
+    private final int INTERVAL_LENGTH = Byte.BYTES + Byte.BYTES + Long.BYTES + Long.BYTES;
+    private final int INTEGER_LENGTH = Byte.BYTES + Long.BYTES;
+    private final AInterval[] INTERVALS = new AInterval[] { new AInterval(10, 15, (byte) 16),
+            new AInterval(10, 20, (byte) 16), new AInterval(15, 20, (byte) 16) };
+    private final AInt64[] INTEGERS = new AInt64[] { new AInt64(10l), new AInt64(15l), new AInt64(20l) };
+
+    @SuppressWarnings("unused")
+    private byte[] getIntervalBytes() throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutput dos = new DataOutputStream(bos);
+            for (int i = 0; i < INTERVALS.length; ++i) {
+                serde.serialize(INTERVALS[i], dos);
+            }
+            bos.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private byte[] getIntegerBytes() throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutput dos = new DataOutputStream(bos);
+            for (int i = 0; i < INTEGERS.length; ++i) {
+                serde.serialize(INTEGERS[i], dos);
+            }
+            bos.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void executeBinaryComparatorTests(IBinaryComparator bc, byte[] bytes, int fieldLength, int[][] results)
+            throws HyracksDataException {
+        for (int i = 0; i < results.length; ++i) {
+            int leftOffset = i * fieldLength;
+            for (int j = 0; j < results[i].length; ++j) {
+                int rightOffset = j * fieldLength;
+                int c = bc.compare(bytes, leftOffset, fieldLength, bytes, rightOffset, fieldLength);
+                Assert.assertEquals("results[" + i + "][" + j + "]", results[i][j], c);
+            }
+        }
+    }
+
+    //    private final AInterval[] INTERVALS = new AInterval[] { new AInterval(10, 20, (byte) 16),
+    //            new AInterval(10, 15, (byte) 16), new AInterval(15, 20, (byte) 16) };
+    //    private final AInt64[] INTEGERS = new AInt64[] { new AInt64(10l), new AInt64(15l), new AInt64(20l) };
+    //
+    @Test
+    public void testIntervalAsc() throws HyracksDataException {
+        IBinaryComparator bc = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        byte[] bytes = getIntervalBytes();
+        int[][] results = new int[3][];
+        results[0] = new int[] { 0, -1, -1 };
+        results[1] = new int[] { 1, 0, -1 };
+        results[2] = new int[] { 1, 1, 0 };
+        executeBinaryComparatorTests(bc, bytes, INTERVAL_LENGTH, results);
+    }
+
+    @Test
+    public void testIntervalDesc() throws HyracksDataException {
+        IBinaryComparator bc = AObjectDescBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        byte[] bytes = getIntervalBytes();
+        int[][] results = new int[3][];
+        results[0] = new int[] { 0, 1, 1 };
+        results[1] = new int[] { -1, 0, 1 };
+        results[2] = new int[] { -1, -1, 0 };
+        executeBinaryComparatorTests(bc, bytes, INTERVAL_LENGTH, results);
+    }
+
+    @Test
+    public void testIntegerAsc() throws HyracksDataException {
+        IBinaryComparator bc = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        byte[] bytes = getIntegerBytes();
+        int[][] results = new int[3][];
+        results[0] = new int[] { 0, -1, -1 };
+        results[1] = new int[] { 1, 0, -1 };
+        results[2] = new int[] { 1, 1, 0 };
+        executeBinaryComparatorTests(bc, bytes, INTEGER_LENGTH, results);
+    }
+
+    @Test
+    public void testIngeterDesc() throws HyracksDataException {
+        IBinaryComparator bc = AObjectDescBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        byte[] bytes = getIntegerBytes();
+        int[][] results = new int[3][];
+        results[0] = new int[] { 0, 1, 1 };
+        results[1] = new int[] { -1, 0, 1 };
+        results[2] = new int[] { -1, -1, 0 };
+        executeBinaryComparatorTests(bc, bytes, INTEGER_LENGTH, results);
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/IntervalRangePartitionComputerFactoryTest.java b/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/IntervalRangePartitionComputerFactoryTest.java
new file mode 100644
index 0000000..6476015
--- /dev/null
+++ b/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/IntervalRangePartitionComputerFactoryTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.dataflow.data.nontagged.comparators.rangeinterval;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import org.apache.asterix.om.base.AInterval;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.storage.IGrowableIntArray;
+import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
+import org.apache.hyracks.storage.common.arraylist.IntArrayList;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class IntervalRangePartitionComputerFactoryTest extends TestCase {
+
+    private final ISerializerDeserializer<AInterval> intervalSerde = AIntervalSerializerDeserializer.INSTANCE;
+    private final Integer64SerializerDeserializer int64Serde = Integer64SerializerDeserializer.INSTANCE;
+    @SuppressWarnings("rawtypes")
+    private final ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] {
+            AIntervalSerializerDeserializer.INSTANCE };
+    private final RecordDescriptor RecordDesc = new RecordDescriptor(SerDers);
+
+    IBinaryRangeComparatorFactory[] BINARY_ASC_COMPARATOR_FACTORIES = new IBinaryRangeComparatorFactory[] {
+            IntervalAscRangeBinaryComparatorFactory.INSTANCE };
+    IBinaryRangeComparatorFactory[] BINARY_DESC_COMPARATOR_FACTORIES = new IBinaryRangeComparatorFactory[] {
+            IntervalDescRangeBinaryComparatorFactory.INSTANCE };
+    /*
+     * The following three intervals (+++) will be tested for these 4 partitions.
+     *
+     *    ----------+++++++++++++++++++++++++++----------
+     *    -----------+++++++++++++++++++++++++-----------
+     *    ------------+++++++++++++++++++++++------------
+     *    -----------|-----------|-----------|-----------
+     *     or 16 partitions.
+     *    --|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--
+     */
+
+    private final int FRAME_SIZE = 320;
+    private final int INTEGER_LENGTH = Long.BYTES;
+
+    private final AInterval[] INTERVALS = new AInterval[] { new AInterval(99, 301, (byte) 16),
+            new AInterval(100, 300, (byte) 16), new AInterval(101, 299, (byte) 16) };
+    private final int INTERVAL_LENGTH = Byte.BYTES + Long.BYTES + Long.BYTES;
+
+    //map          { 0l, 25l, 50l, 75l, 100l, 125l, 150l, 175l, 200l, 225l, 250l, 275l, 300l, 325l, 350l, 375l, 400l };
+    //partitions   {    0,   1,   2,   3,    4,    5,    6,    7,    8,    9,    10,   11,   12,   13,   14,  15     };
+    private final Long[] MAP_POINTS = new Long[] { 0l, 25l, 50l, 75l, 100l, 125l, 150l, 175l, 200l, 225l, 250l, 275l,
+            300l, 325l, 350l, 375l, 400l };
+
+    @SuppressWarnings("unused")
+    private byte[] getIntervalBytes(AInterval[] intervals) throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutput dos = new DataOutputStream(bos);
+            for (int i = 0; i < intervals.length; ++i) {
+                intervalSerde.serialize(intervals[i], dos);
+            }
+            bos.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private byte[] getIntegerBytes(Long[] integers) throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutput dos = new DataOutputStream(bos);
+            for (int i = 0; i < integers.length; ++i) {
+                int64Serde.serialize(integers[i], dos);
+            }
+            bos.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private IRangeMap getRangeMap(Long[] integers) throws HyracksDataException {
+        int offsets[] = new int[integers.length];
+        for (int i = 0; i < integers.length; ++i) {
+            offsets[i] = (i + 1) * INTEGER_LENGTH;
+        }
+        return new RangeMap(1, getIntegerBytes(integers), offsets);
+    }
+
+    private ByteBuffer prepareData(IHyracksTaskContext ctx, AInterval[] intervals) throws HyracksDataException {
+        IFrame frame = new VSizeFrame(ctx);
+        FrameFixedFieldTupleAppender fffta = new FrameFixedFieldTupleAppender(RecordDesc.getFieldCount());
+        fffta.reset(frame, true);
+
+        byte[] serializedIntervals = getIntervalBytes(intervals);
+        for (int i = 0; i < intervals.length; ++i) {
+            fffta.appendField(serializedIntervals, i * INTERVAL_LENGTH, INTERVAL_LENGTH);
+        }
+
+        return frame.getBuffer();
+    }
+
+    private void executeFieldRangePartitionTests(AInterval[] intervals, IRangeMap rangeMap,
+            IBinaryRangeComparatorFactory[] comparatorFactories, RangePartitioningType rangeType, int nParts,
+            int[][] results) throws HyracksDataException {
+        IHyracksTaskContext ctx = TestUtils.create(FRAME_SIZE);
+        int[] rangeFields = new int[] { 0 };
+        ITupleRangePartitionComputerFactory frpcf = new FieldRangePartitionComputerFactory(rangeFields,
+                comparatorFactories, rangeMap, rangeType);
+        ITupleRangePartitionComputer partitioner = frpcf.createPartitioner();
+
+        IFrameTupleAccessor accessor = new FrameTupleAccessor(RecordDesc);
+        ByteBuffer buffer = prepareData(ctx, intervals);
+        accessor.reset(buffer);
+
+        IGrowableIntArray map = new IntArrayList(16, 1);
+
+        for (int i = 0; i < results.length; ++i) {
+            map.clear();
+            partitioner.partition(accessor, i, nParts, map);
+            checkPartitionResult(intervals[i], results[i], map);
+        }
+    }
+
+    private String getString(int[] results) {
+        String result = "[";
+        for (int i = 0; i < results.length; ++i) {
+            result += results[i];
+            if (i < results.length - 1) {
+                result += ", ";
+            }
+        }
+        result += "]";
+        return result;
+    }
+
+    private String getString(IGrowableIntArray results) {
+        String result = "[";
+        for (int i = 0; i < results.size(); ++i) {
+            result += results.get(i);
+            if (i < results.size() - 1) {
+                result += ", ";
+            }
+        }
+        result += "]";
+        return result;
+    }
+
+    private void checkPartitionResult(AInterval value, int[] results, IGrowableIntArray map) {
+        if (results.length != map.size()) {
+            Assert.assertEquals("The partition for value (" + value + ") gives different number of partitions",
+                    results.length, map.size());
+        }
+        for (int i = 0; i < results.length; ++i) {
+            boolean match = false;
+            for (int j = 0; j < results.length; ++j) {
+                if (results[i] == map.get(j)) {
+                    match = true;
+                    continue;
+                }
+            }
+            if (!match) {
+                Assert.assertEquals("Individual partitions for " + value + " do not match", getString(results),
+                        getString(map));
+                return;
+            }
+        }
+    }
+
+    @Test
+    public void testFieldRangePartitionAscProject16Partitions() throws HyracksDataException {
+        int[][] results = new int[3][];
+        results[0] = new int[] { 3 };
+        results[1] = new int[] { 4 };
+        results[2] = new int[] { 4 };
+
+        IRangeMap rangeMap = getRangeMap(MAP_POINTS);
+
+        executeFieldRangePartitionTests(INTERVALS, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES,
+                RangePartitioningType.PROJECT, 16, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionDescProject16Partitions() throws HyracksDataException {
+        int[][] results = new int[3][];
+        results[0] = new int[] { 3 };
+        results[1] = new int[] { 4 };
+        results[2] = new int[] { 4 };
+
+        Long[] map = MAP_POINTS.clone();
+        ArrayUtils.reverse(map);
+        IRangeMap rangeMap = getRangeMap(map);
+
+        executeFieldRangePartitionTests(INTERVALS, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES,
+                RangePartitioningType.PROJECT, 16, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionAscProjectEnd16Partitions() throws HyracksDataException {
+        int[][] results = new int[3][];
+        results[0] = new int[] { 12 };
+        results[1] = new int[] { 12 };
+        results[2] = new int[] { 11 };
+
+        IRangeMap rangeMap = getRangeMap(MAP_POINTS);
+
+        executeFieldRangePartitionTests(INTERVALS, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES,
+                RangePartitioningType.PROJECT_END, 16, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionDescProjectEnd16Partitions() throws HyracksDataException {
+        int[][] results = new int[3][];
+        results[0] = new int[] { 12 };
+        results[1] = new int[] { 12 };
+        results[2] = new int[] { 11 };
+
+        Long[] map = MAP_POINTS.clone();
+        ArrayUtils.reverse(map);
+        IRangeMap rangeMap = getRangeMap(map);
+
+        executeFieldRangePartitionTests(INTERVALS, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES,
+                RangePartitioningType.PROJECT_END, 16, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionAscSplit16Partitions() throws HyracksDataException {
+        int[][] results = new int[3][];
+        results[0] = new int[] { 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
+        results[1] = new int[] { 4, 5, 6, 7, 8, 9, 10, 11, 12 };
+        results[2] = new int[] { 4, 5, 6, 7, 8, 9, 10, 11 };
+
+        IRangeMap rangeMap = getRangeMap(MAP_POINTS);
+
+        executeFieldRangePartitionTests(INTERVALS, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES,
+                RangePartitioningType.SPLIT, 16, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionDescSplit16Partitions() throws HyracksDataException {
+        int[][] results = new int[3][];
+        results[0] = new int[] { 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
+        results[1] = new int[] { 4, 5, 6, 7, 8, 9, 10, 11, 12 };
+        results[2] = new int[] { 4, 5, 6, 7, 8, 9, 10, 11 };
+
+        Long[] map = MAP_POINTS.clone();
+        ArrayUtils.reverse(map);
+        IRangeMap rangeMap = getRangeMap(map);
+
+        executeFieldRangePartitionTests(INTERVALS, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES,
+                RangePartitioningType.SPLIT, 16, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionAscReplicate16Partitions() throws HyracksDataException {
+        int[][] results = new int[3][];
+        results[0] = new int[] { 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+        results[1] = new int[] { 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+        results[2] = new int[] { 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+
+        IRangeMap rangeMap = getRangeMap(MAP_POINTS);
+
+        executeFieldRangePartitionTests(INTERVALS, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES,
+                RangePartitioningType.REPLICATE, 16, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionDescReplicate16Partitions() throws HyracksDataException {
+        int[][] results = new int[3][];
+        results[0] = new int[] { 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+        results[1] = new int[] { 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+        results[2] = new int[] { 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+
+        Long[] map = MAP_POINTS.clone();
+        ArrayUtils.reverse(map);
+        IRangeMap rangeMap = getRangeMap(map);
+
+        executeFieldRangePartitionTests(INTERVALS, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES,
+                RangePartitioningType.REPLICATE, 16, results);
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalBinaryComparatorFactoryTest.java b/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalBinaryComparatorFactoryTest.java
new file mode 100644
index 0000000..d8fd8d6
--- /dev/null
+++ b/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalBinaryComparatorFactoryTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.dataflow.data.nontagged.comparators.rangeinterval;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AInterval;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class RangeIntervalBinaryComparatorFactoryTest extends TestCase {
+
+    private final ISerializerDeserializer<AInterval> intervalSerde = AIntervalSerializerDeserializer.INSTANCE;
+    private final ISerializerDeserializer<AInt64> intSerde = AInt64SerializerDeserializer.INSTANCE;
+
+    /*
+     * The following points (X) will be tested for this interval (+).
+     *
+     * ----X---XXX---X---XXX---X----
+     * ---------+++++++++++---------
+     */
+    private final AInterval INTERVAL = new AInterval(10, 20, (byte) 16);
+    private final int INTERVAL_OFFSET = 0;
+    private int INTERVAL_LENGTH;
+    private final int POINT_LENGTH = Long.BYTES;
+    private final AInt64[] MAP_POINTS = new AInt64[] { new AInt64(5l), new AInt64(9l), new AInt64(10l), new AInt64(11l),
+            new AInt64(15l), new AInt64(19l), new AInt64(20l), new AInt64(21l), new AInt64(25l) };
+
+    private byte[] getIntervalBytes() throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutput dos = new DataOutputStream(bos);
+            intervalSerde.serialize(INTERVAL, dos);
+            bos.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private byte[] getIntegerMapBytes() throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutput dos = new DataOutputStream(bos);
+            for (int i = 0; i < MAP_POINTS.length; ++i) {
+                intSerde.serialize(MAP_POINTS[i], dos);
+            }
+            bos.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void executeBinaryComparatorTests(IBinaryComparator bc, int[] results) throws HyracksDataException {
+        // Bytes for interval
+        byte[] intervalBytes = getIntervalBytes();
+        INTERVAL_LENGTH = AIntervalSerializerDeserializer.getIntervalLength(intervalBytes, INTERVAL_OFFSET);
+
+        // Bytes for map points
+        byte[] intBytes = getIntegerMapBytes();
+
+        for (int i = 0; i < results.length; ++i) {
+            int point_offset = i * POINT_LENGTH;
+            int c = bc.compare(intervalBytes, INTERVAL_OFFSET, INTERVAL_LENGTH, intBytes, point_offset, POINT_LENGTH);
+            Assert.assertEquals(INTERVAL + " compares to map point (" + MAP_POINTS[i].getLongValue() + ")", results[i],
+                    c);
+        }
+    }
+
+    @Test
+    public void testAscMin() throws HyracksDataException {
+        IBinaryComparator bc = IntervalAscRangeBinaryComparatorFactory.INSTANCE.createMinBinaryComparator();
+        int[] results = new int[] { 1, 1, 0, -1, -1, -1, -1, -1, -1 };
+        executeBinaryComparatorTests(bc, results);
+    }
+
+    @Test
+    public void testAscMax() throws HyracksDataException {
+        IBinaryComparator bc = IntervalAscRangeBinaryComparatorFactory.INSTANCE.createMaxBinaryComparator();
+        int[] results = new int[] { 1, 1, 1, 1, 1, 1, 0, -1, -1 };
+        executeBinaryComparatorTests(bc, results);
+    }
+
+    @Test
+    public void testDescMin() throws HyracksDataException {
+        IBinaryComparator bc = IntervalDescRangeBinaryComparatorFactory.INSTANCE.createMinBinaryComparator();
+        int[] results = new int[] { -1, -1, -1, -1, -1, -1, 0, 1, 1 };
+        executeBinaryComparatorTests(bc, results);
+    }
+
+    @Test
+    public void testDescMax() throws HyracksDataException {
+        IBinaryComparator bc = IntervalDescRangeBinaryComparatorFactory.INSTANCE.createMaxBinaryComparator();
+        int[] results = new int[] { -1, -1, 0, 1, 1, 1, 1, 1, 1 };
+        executeBinaryComparatorTests(bc, results);
+    }
+
+}
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index dd7a8b3..e3c21f0 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -684,6 +684,11 @@
                 <version>${hyracks.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.hyracks</groupId>
+                <artifactId>hyracks-test-support</artifactId>
+                <version>${hyracks.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.json</groupId>
                 <artifactId>json</artifactId>
                 <version>${json.version}</version>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 9f62124..f209317 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -68,8 +68,8 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -155,14 +155,14 @@
                     break;
                 }
                 case RANGE_PARTITION_EXCHANGE: {
-                    RangePartitionPOperator concreteOp = (RangePartitionPOperator) physOp;
+                    RangePartitionExchangePOperator concreteOp = (RangePartitionExchangePOperator) physOp;
                     for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
                         usedVariables.add(partCol.getColumn());
                     }
                     break;
                 }
                 case RANGE_PARTITION_MERGE_EXCHANGE: {
-                    RangePartitionMergePOperator concreteOp = (RangePartitionMergePOperator) physOp;
+                    RangePartitionMergeExchangePOperator concreteOp = (RangePartitionMergeExchangePOperator) physOp;
                     for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
                         usedVariables.add(partCol.getColumn());
                     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastExchangePOperator.java
similarity index 91%
rename from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
rename to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastExchangePOperator.java
index 69bfe2a..bf22eb3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastExchangePOperator.java
@@ -37,13 +37,13 @@
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToNBroadcastConnectorDescriptor;
 
-public class BroadcastPOperator extends AbstractExchangePOperator {
+public class BroadcastExchangePOperator extends AbstractExchangePOperator {
 
     private INodeDomain domain;
 
-    public BroadcastPOperator(INodeDomain domain) {
+    public BroadcastExchangePOperator(INodeDomain domain) {
         this.domain = domain;
     }
 
@@ -68,7 +68,7 @@
     @Override
     public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
             ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
-        IConnectorDescriptor conn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor conn = new MToNBroadcastConnectorDescriptor(spec);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
index e11a64f..aedf046 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
@@ -34,7 +34,7 @@
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToNBroadcastConnectorDescriptor;
 
 public class RandomMergeExchangePOperator extends AbstractExchangePOperator {
 
@@ -58,7 +58,7 @@
     @Override
     public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
             ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) {
-        IConnectorDescriptor conn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor conn = new MToNBroadcastConnectorDescriptor(spec);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, TargetConstraint.ONE);
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionExchangePOperator.java
similarity index 96%
rename from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
rename to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionExchangePOperator.java
index 7237d24..cba8f97 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionExchangePOperator.java
@@ -39,11 +39,11 @@
 import org.apache.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 
-public class RandomPartitionPOperator extends AbstractExchangePOperator {
+public class RandomPartitionExchangePOperator extends AbstractExchangePOperator {
 
     private final INodeDomain domain;
 
-    public RandomPartitionPOperator(INodeDomain domain) {
+    public RandomPartitionExchangePOperator(INodeDomain domain) {
         this.domain = domain;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
similarity index 73%
copy from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
copy to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
index 1f70f2a..61fa883 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
@@ -19,11 +19,9 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -38,40 +36,43 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
 import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
 import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.std.connectors.MToNRangePartitioningConnectorDescriptor;
 
-public class RangePartitionMergePOperator extends AbstractExchangePOperator {
+public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
 
     private List<OrderColumn> partitioningFields;
     private INodeDomain domain;
     private IRangeMap rangeMap;
+    private RangePartitioningType rangeType;
 
-    public RangePartitionMergePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
+    public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap,
+            RangePartitioningType rangeType) {
         this.partitioningFields = partitioningFields;
         this.domain = domain;
         this.rangeMap = rangeMap;
+        this.rangeType = rangeType;
     }
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
-        return PhysicalOperatorTag.RANGE_PARTITION_MERGE_EXCHANGE;
+        return PhysicalOperatorTag.RANGE_PARTITION_EXCHANGE;
     }
 
     public List<OrderColumn> getPartitioningFields() {
@@ -84,11 +85,8 @@
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        List<LogicalVariable> varList = new ArrayList<LogicalVariable>();
-        for (OrderColumn oc : partitioningFields) {
-            varList.add(oc.getColumn());
-        }
-        IPartitioningProperty p = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(varList), domain);
+        IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain,
+                rangeMap, rangeType);
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
         List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
         List<ILocalStructuralProperty> locals = new ArrayList<ILocalStructuralProperty>();
@@ -99,23 +97,13 @@
                 break;
             }
         }
-
         this.deliveredProperties = new StructuralPropertiesVector(p, locals);
     }
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
-        List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>();
-        List<OrderColumn> columns = new ArrayList<OrderColumn>();
-        for (OrderColumn oc : partitioningFields) {
-            LogicalVariable var = oc.getColumn();
-            columns.add(new OrderColumn(var, oc.getOrder()));
-        }
-        orderProps.add(new LocalOrderProperty(columns));
-        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,
-                orderProps) };
-        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+        return emptyUnaryRequirements();
     }
 
     @Override
@@ -123,7 +111,8 @@
             ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
         int n = partitioningFields.size();
         int[] sortFields = new int[n];
-        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+        IBinaryRangeComparatorFactory[] rangeComps = new IBinaryRangeComparatorFactory[n];
+        IBinaryComparatorFactory[] binaryComps = new IBinaryComparatorFactory[n];
 
         INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
         INormalizedKeyComputerFactory nkcf = null;
@@ -139,17 +128,20 @@
                 nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
             }
             IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
-            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+            rangeComps[i] = bcfp.getRangeBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC, rangeType);
+            binaryComps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
             i++;
         }
-        ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
-        IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
+        ITupleRangePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, rangeComps,
+                rangeMap, rangeType);
+        IConnectorDescriptor conn = new MToNRangePartitioningConnectorDescriptor(spec, tpcf, sortFields, binaryComps,
+                nkcf);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }
 
     @Override
     public String toString() {
-        return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
+        return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType;
     }
 
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
similarity index 79%
rename from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
rename to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
index 1f70f2a..1202db6 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
@@ -23,7 +23,6 @@
 import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -42,31 +41,36 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
 import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
 import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.std.connectors.MToNRangePartitionMergingConnectorDescriptor;
 
-public class RangePartitionMergePOperator extends AbstractExchangePOperator {
+public class RangePartitionMergeExchangePOperator extends AbstractExchangePOperator {
 
     private List<OrderColumn> partitioningFields;
     private INodeDomain domain;
     private IRangeMap rangeMap;
+    private RangePartitioningType rangeType;
 
-    public RangePartitionMergePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
+    public RangePartitionMergeExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain,
+            IRangeMap rangeMap, RangePartitioningType rangeType) {
         this.partitioningFields = partitioningFields;
         this.domain = domain;
         this.rangeMap = rangeMap;
+        this.rangeType = rangeType;
     }
 
     @Override
@@ -84,11 +88,7 @@
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        List<LogicalVariable> varList = new ArrayList<LogicalVariable>();
-        for (OrderColumn oc : partitioningFields) {
-            varList.add(oc.getColumn());
-        }
-        IPartitioningProperty p = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(varList), domain);
+        IPartitioningProperty p = new OrderedPartitionedProperty(partitioningFields, domain, rangeMap, rangeType);
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
         List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
         List<ILocalStructuralProperty> locals = new ArrayList<ILocalStructuralProperty>();
@@ -99,7 +99,6 @@
                 break;
             }
         }
-
         this.deliveredProperties = new StructuralPropertiesVector(p, locals);
     }
 
@@ -113,8 +112,8 @@
             columns.add(new OrderColumn(var, oc.getOrder()));
         }
         orderProps.add(new LocalOrderProperty(columns));
-        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,
-                orderProps) };
+        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] {
+                new StructuralPropertiesVector(null, orderProps) };
         return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
     }
 
@@ -123,7 +122,8 @@
             ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
         int n = partitioningFields.size();
         int[] sortFields = new int[n];
-        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+        IBinaryRangeComparatorFactory[] rangeComps = new IBinaryRangeComparatorFactory[n];
+        IBinaryComparatorFactory[] binaryComps = new IBinaryComparatorFactory[n];
 
         INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
         INormalizedKeyComputerFactory nkcf = null;
@@ -139,17 +139,20 @@
                 nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
             }
             IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
-            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+            rangeComps[i] = bcfp.getRangeBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC, rangeType);
+            binaryComps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
             i++;
         }
-        ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
-        IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
+        ITupleRangePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, rangeComps,
+                rangeMap, rangeType);
+        IConnectorDescriptor conn = new MToNRangePartitionMergingConnectorDescriptor(spec, tpcf, sortFields,
+                binaryComps, nkcf);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }
 
     @Override
     public String toString() {
-        return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
+        return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType;
     }
 
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
deleted file mode 100644
index 9046ce2..0000000
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-
-public class RangePartitionPOperator extends AbstractExchangePOperator {
-
-    private List<OrderColumn> partitioningFields;
-    private INodeDomain domain;
-    private IRangeMap rangeMap;
-
-    public RangePartitionPOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
-        this.partitioningFields = partitioningFields;
-        this.domain = domain;
-        this.rangeMap = rangeMap;
-    }
-
-    @Override
-    public PhysicalOperatorTag getOperatorTag() {
-        return PhysicalOperatorTag.RANGE_PARTITION_EXCHANGE;
-    }
-
-    public List<OrderColumn> getPartitioningFields() {
-        return partitioningFields;
-    }
-
-    public INodeDomain getDomain() {
-        return domain;
-    }
-
-    @Override
-    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain);
-        this.deliveredProperties = new StructuralPropertiesVector(p, new LinkedList<ILocalStructuralProperty>());
-    }
-
-    @Override
-    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
-        return emptyUnaryRequirements();
-    }
-
-    @Override
-    public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
-            ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
-        int n = partitioningFields.size();
-        int[] sortFields = new int[n];
-        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
-
-        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
-        INormalizedKeyComputerFactory nkcf = null;
-
-        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
-        int i = 0;
-        for (OrderColumn oc : partitioningFields) {
-            LogicalVariable var = oc.getColumn();
-            sortFields[i] = opSchema.findVariable(var);
-            Object type = env.getVarType(var);
-            OrderKind order = oc.getOrder();
-            if (i == 0 && nkcfProvider != null && type != null) {
-                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
-            }
-            IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
-            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
-            i++;
-        }
-        ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
-        IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
-        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
-    }
-
-    @Override
-    public String toString() {
-        return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
-    }
-
-}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
index 81f6e6b..4cf5194 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
@@ -124,8 +124,8 @@
             IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>(sortColumns.length);
         localProps.add(new LocalOrderProperty(Arrays.asList(sortColumns)));
-        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,
-                localProps) };
+        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] {
+                new StructuralPropertiesVector(null, localProps) };
         return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
index 89ac374..c2751ed 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
@@ -26,8 +26,31 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 
 public interface IPartitioningProperty extends IStructuralProperty {
-    enum PartitioningType {
-        UNPARTITIONED, RANDOM, BROADCAST, UNORDERED_PARTITIONED, ORDERED_PARTITIONED
+    /**
+     * The Partitioning Types define the method data is transfered between partitions and/or properties of the data.
+     */
+    public enum PartitioningType {
+        /**
+         * Data is not partitioned.
+         */
+        UNPARTITIONED,
+        /**
+         * Data is partitioned without a repeatable method.
+         */
+        RANDOM,
+        /**
+         * Data is replicated to all partitions.
+         */
+        BROADCAST,
+        /**
+         * Data is hash partitioned.
+         */
+        UNORDERED_PARTITIONED,
+        /**
+         * Data is range partitioned (only used on data that has a total order).
+         * The partitions are order based on the data range.
+         */
+        ORDERED_PARTITIONED
     }
 
     INodeDomain DOMAIN_FOR_UNPARTITIONED_DATA = new INodeDomain() {
@@ -50,7 +73,8 @@
         }
 
         @Override
-        public void normalize(Map<LogicalVariable, EquivalenceClass> equivalenceClasses, List<FunctionalDependency> fds) {
+        public void normalize(Map<LogicalVariable, EquivalenceClass> equivalenceClasses,
+                List<FunctionalDependency> fds) {
             // do nothing
         }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
index 44df740..c90aff9 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
@@ -25,22 +25,36 @@
 
 import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
 
 public class OrderedPartitionedProperty implements IPartitioningProperty {
 
-    private ArrayList<OrderColumn> orderColumns;
+    private List<OrderColumn> orderColumns;
     private INodeDomain domain;
+    private IRangeMap rangeMap;
+    private RangePartitioningType rangeType;
 
-    public OrderedPartitionedProperty(ArrayList<OrderColumn> orderColumns, INodeDomain domain) {
+    public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, IRangeMap rangeMap,
+            RangePartitioningType rangeType) {
         this.domain = domain;
         this.orderColumns = orderColumns;
+        this.rangeMap = rangeMap;
+        this.rangeType = rangeType;
     }
 
-    public ArrayList<OrderColumn> getOrderColumns() {
+    public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, IRangeMap rangeMap) {
+        this.domain = domain;
+        this.orderColumns = orderColumns;
+        this.rangeMap = rangeMap;
+        this.rangeType = RangePartitioningType.PROJECT;
+    }
+
+    public List<OrderColumn> getOrderColumns() {
         return orderColumns;
     }
 
-    public ArrayList<LogicalVariable> getColumns() {
+    public List<LogicalVariable> getColumns() {
         ArrayList<LogicalVariable> cols = new ArrayList<LogicalVariable>(orderColumns.size());
         for (OrderColumn oc : orderColumns) {
             cols.add(oc.getColumn());
@@ -53,9 +67,13 @@
         return PartitioningType.ORDERED_PARTITIONED;
     }
 
+    public RangePartitioningType getRangePartitioningType() {
+        return rangeType;
+    }
+
     @Override
     public String toString() {
-        return getPartitioningType().toString() + orderColumns;
+        return getPartitioningType().toString() + " Column(s): " + orderColumns + " Range Type: " + rangeType;
     }
 
     @Override
@@ -71,6 +89,10 @@
         }
     }
 
+    public IRangeMap getRangeMap() {
+        return rangeMap;
+    }
+
     @Override
     public INodeDomain getNodeDomain() {
         return domain;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index 0b8d759..051625b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -159,9 +159,13 @@
                         OrderedPartitionedProperty or = (OrderedPartitionedProperty) reqd;
                         OrderedPartitionedProperty od = (OrderedPartitionedProperty) dlvd;
                         if (mayExpandProperties) {
-                            return isPrefixOf(od.getOrderColumns().iterator(), or.getOrderColumns().iterator());
+                            return (isPrefixOf(od.getOrderColumns().iterator(), or.getOrderColumns().iterator())
+                                    && or.getRangePartitioningType().equals(od.getRangePartitioningType())
+                                    && or.getRangeMap().equals(od.getRangeMap()));
                         } else {
-                            return od.getOrderColumns().equals(or.getOrderColumns());
+                            return (or.getOrderColumns().equals(od.getOrderColumns())
+                                    && or.getRangePartitioningType().equals(od.getRangePartitioningType())
+                                    && or.getRangeMap().equals(od.getRangeMap()));
                         }
                     }
                     default: {
@@ -208,7 +212,7 @@
         return true;
     }
 
-    public static ArrayList<OrderColumn> applyFDsToOrderColumns(ArrayList<OrderColumn> orderColumns,
+    public static List<OrderColumn> applyFDsToOrderColumns(List<OrderColumn> orderColumns,
             List<FunctionalDependency> fds) {
         // the set of vars. is ordered
         // so we try the variables in order from last to first
@@ -235,7 +239,7 @@
         return norm;
     }
 
-    public static ArrayList<OrderColumn> replaceOrderColumnsByEqClasses(ArrayList<OrderColumn> orderColumns,
+    public static List<OrderColumn> replaceOrderColumnsByEqClasses(List<OrderColumn> orderColumns,
             Map<LogicalVariable, EquivalenceClass> equivalenceClasses) {
         if (equivalenceClasses == null || equivalenceClasses.isEmpty()) {
             return orderColumns;
@@ -256,7 +260,7 @@
         return norm;
     }
 
-    private static boolean impliedByPrefix(ArrayList<OrderColumn> vars, int i, FunctionalDependency fdep) {
+    private static boolean impliedByPrefix(List<OrderColumn> vars, int i, FunctionalDependency fdep) {
         if (!fdep.getTail().contains(vars.get(i).getColumn())) {
             return false;
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-data/pom.xml b/hyracks-fullstack/algebricks/algebricks-data/pom.xml
index 2f08ec9..d6e2532 100644
--- a/hyracks-fullstack/algebricks/algebricks-data/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-data/pom.xml
@@ -17,7 +17,10 @@
  ! under the License.
  !-->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project
+  xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <artifactId>algebricks-data</artifactId>
   <name>algebricks-data</name>
@@ -38,19 +41,24 @@
   </licenses>
 
   <properties>
-      <root.dir>${basedir}/../..</root.dir>
+    <root.dir>${basedir}/../..</root.dir>
   </properties>
 
   <dependencies>
-  <dependency>
-  	<groupId>org.apache.hyracks</groupId>
-  	<artifactId>algebricks-common</artifactId>
-  	<version>0.2.18-SNAPSHOT</version>
-  </dependency>
-  <dependency>
-  	<groupId>org.apache.hyracks</groupId>
-  	<artifactId>hyracks-data-std</artifactId>
-  	<version>0.2.18-SNAPSHOT</version>
-  </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>algebricks-common</artifactId>
+      <version>0.2.18-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-data-std</artifactId>
+      <version>0.2.18-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-dataflow-common</artifactId>
+      <version>0.2.18-SNAPSHOT</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
index bdeb018..44e04b5 100644
--- a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
@@ -20,8 +20,13 @@
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
 
 public interface IBinaryComparatorFactoryProvider {
     public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending)
             throws AlgebricksException;
+
+    public IBinaryRangeComparatorFactory getRangeBinaryComparatorFactory(Object type, boolean ascending,
+            RangePartitioningType rangeType) throws AlgebricksException;
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 53f7dbf..1e8c853 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -51,7 +51,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.FDsAndEquivClassesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
@@ -59,9 +59,9 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
@@ -83,12 +83,14 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
 import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
 
 public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
 
@@ -496,6 +498,7 @@
 
         op.getInputs().set(i, topOp);
         OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull((AbstractLogicalOperator) topOp.getValue(), context);
+        OperatorManipulationUtil.setOperatorMode(op);
         printOp((AbstractLogicalOperator) topOp.getValue());
     }
 
@@ -540,7 +543,8 @@
                         if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) {
                             IRangeMap rangeMap = (IRangeMap) op.getAnnotations()
                                     .get(OperatorAnnotations.USE_RANGE_CONNECTOR);
-                            pop = new RangePartitionMergePOperator(ordCols, domain, rangeMap);
+                            pop = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeMap,
+                                    RangePartitioningType.PROJECT);
                         } else {
                             OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
                             sortColumns = ordCols.toArray(sortColumns);
@@ -573,18 +577,37 @@
                     break;
                 }
                 case ORDERED_PARTITIONED: {
-                    pop = new RangePartitionPOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), domain,
-                            null);
+                    OrderedPartitionedProperty opp = (OrderedPartitionedProperty) pp;
+                    List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
+                    List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties();
+                    boolean propWasSet = false;
+                    pop = null;
+                    if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) {
+                        AbstractLogicalOperator c = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+                        Map<LogicalVariable, EquivalenceClass> ecs = context.getEquivalenceClassMap(c);
+                        List<FunctionalDependency> fds = context.getFDList(c);
+                        if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) {
+                            List<OrderColumn> orderColumns = getOrderColumnsFromGroupingProperties(reqdLocals,
+                                    cldLocals);
+                            pop = new RangePartitionMergeExchangePOperator(orderColumns, domain, opp.getRangeMap(),
+                                    opp.getRangePartitioningType());
+                            propWasSet = true;
+                        }
+                    }
+                    if (!propWasSet) {
+                        pop = new RangePartitionExchangePOperator(opp.getOrderColumns(), domain, opp.getRangeMap(),
+                                opp.getRangePartitioningType());
+                    }
                     break;
                 }
                 case BROADCAST: {
-                    pop = new BroadcastPOperator(domain);
+                    pop = new BroadcastExchangePOperator(domain);
                     break;
                 }
                 case RANDOM: {
                     RandomPartitioningProperty rpp = (RandomPartitioningProperty) pp;
                     INodeDomain nd = rpp.getNodeDomain();
-                    pop = new RandomPartitionPOperator(nd);
+                    pop = new RandomPartitionExchangePOperator(nd);
                     break;
                 }
                 default: {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IBinaryRangeComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IBinaryRangeComparatorFactory.java
new file mode 100644
index 0000000..556156a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IBinaryRangeComparatorFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IBinaryRangeComparatorFactory extends Serializable {
+    public IBinaryComparator createMinBinaryComparator();
+    public IBinaryComparator createMaxBinaryComparator();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputer.java
new file mode 100644
index 0000000..fa51d5f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.storage.IGrowableIntArray;
+
+public interface ITupleRangePartitionComputer {
+    public void partition(IFrameTupleAccessor accessor, int tIndex, int nParts, IGrowableIntArray map)
+            throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java
new file mode 100644
index 0000000..5406366
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITupleRangePartitionComputerFactory extends Serializable {
+    public ITupleRangePartitionComputer createPartitioner();
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFamily.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFamily.java
new file mode 100644
index 0000000..9e804ba
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFamily.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITupleRangePartitionComputerFamily extends Serializable {
+    public ITupleRangePartitionComputer createPartitioner(int seed);
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/storage/IGrowableIntArray.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/storage/IGrowableIntArray.java
new file mode 100644
index 0000000..4e18b2b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/storage/IGrowableIntArray.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.storage;
+
+public interface IGrowableIntArray {
+
+    int size();
+
+    void add(int i);
+
+    int get(int i);
+
+    void clear();
+
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/accessors/PointableBinaryComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/accessors/PointableBinaryComparatorFactory.java
index 7082e77..d1fc8a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/accessors/PointableBinaryComparatorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/accessors/PointableBinaryComparatorFactory.java
@@ -27,7 +27,7 @@
 public class PointableBinaryComparatorFactory implements IBinaryComparatorFactory {
     private static final long serialVersionUID = 1L;
 
-    private final IPointableFactory pf;
+    protected final IPointableFactory pf;
 
     public static PointableBinaryComparatorFactory of(IPointableFactory pf) {
         return new PointableBinaryComparatorFactory(pf);
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/accessors/PointableBinaryRangeAscComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/accessors/PointableBinaryRangeAscComparatorFactory.java
new file mode 100644
index 0000000..3cc5e8f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/accessors/PointableBinaryRangeAscComparatorFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.data.std.accessors;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+import org.apache.hyracks.data.std.api.IComparable;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IPointableFactory;
+
+public class PointableBinaryRangeAscComparatorFactory implements IBinaryRangeComparatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    protected final IPointableFactory pf;
+
+    public static PointableBinaryRangeAscComparatorFactory of(IPointableFactory pf) {
+        return new PointableBinaryRangeAscComparatorFactory(pf);
+    }
+
+    public PointableBinaryRangeAscComparatorFactory(IPointableFactory pf) {
+        this.pf = pf;
+    }
+
+    @Override
+    public IBinaryComparator createMinBinaryComparator() {
+        final IPointable p = pf.createPointable();
+        return new IBinaryComparator() {
+            @Override
+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                if (l1 == 0 && l2 != 0)
+                    return -1;
+                if (l1 != 0 && l2 == 0)
+                    return 1;
+                p.set(b1, s1, l1);
+                return ((IComparable) p).compareTo(b2, s2, l2);
+            }
+        };
+    }
+
+    @Override
+    public IBinaryComparator createMaxBinaryComparator() {
+        return createMinBinaryComparator();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/accessors/PointableBinaryRangeDescComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/accessors/PointableBinaryRangeDescComparatorFactory.java
new file mode 100644
index 0000000..13274ac
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/accessors/PointableBinaryRangeDescComparatorFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.data.std.accessors;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+import org.apache.hyracks.data.std.api.IComparable;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IPointableFactory;
+
+public class PointableBinaryRangeDescComparatorFactory implements IBinaryRangeComparatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    protected final IPointableFactory pf;
+
+    public static PointableBinaryRangeDescComparatorFactory of(IPointableFactory pf) {
+        return new PointableBinaryRangeDescComparatorFactory(pf);
+    }
+
+    public PointableBinaryRangeDescComparatorFactory(IPointableFactory pf) {
+        this.pf = pf;
+    }
+
+    @Override
+    public IBinaryComparator createMinBinaryComparator() {
+        final IPointable p = pf.createPointable();
+        return new IBinaryComparator() {
+            @Override
+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                if (l1 == 0 && l2 != 0)
+                    return -1;
+                if (l1 != 0 && l2 == 0)
+                    return 1;
+                p.set(b1, s1, l1);
+                return -((IComparable) p).compareTo(b2, s2, l2);
+            }
+        };
+    }
+
+    @Override
+    public IBinaryComparator createMaxBinaryComparator() {
+        return createMinBinaryComparator();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
index f4da9bf..2aec7d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -20,65 +20,139 @@
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.storage.IGrowableIntArray;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
 
-public class FieldRangePartitionComputerFactory implements ITuplePartitionComputerFactory {
+public class FieldRangePartitionComputerFactory implements ITupleRangePartitionComputerFactory {
     private static final long serialVersionUID = 1L;
     private final int[] rangeFields;
     private IRangeMap rangeMap;
-    private IBinaryComparatorFactory[] comparatorFactories;
+    private IBinaryRangeComparatorFactory[] comparatorFactories;
+    private RangePartitioningType rangeType;
 
-    public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
-            IRangeMap rangeMap) {
+    public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryRangeComparatorFactory[] comparatorFactories,
+            IRangeMap rangeMap, RangePartitioningType rangeType) {
         this.rangeFields = rangeFields;
         this.comparatorFactories = comparatorFactories;
         this.rangeMap = rangeMap;
+        this.rangeType = rangeType;
     }
 
-    @Override
-    public ITuplePartitionComputer createPartitioner() {
-        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+    public ITupleRangePartitionComputer createPartitioner() {
+        final IBinaryComparator[] minComparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
+            minComparators[i] = comparatorFactories[i].createMinBinaryComparator();
         }
-        return new ITuplePartitionComputer() {
-            @Override
-            /**
-             * Determine the range partition.
-             */
-            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+        final IBinaryComparator[] maxComparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            maxComparators[i] = comparatorFactories[i].createMaxBinaryComparator();
+        }
+        return new ITupleRangePartitionComputer() {
+            private int partionCount;
+            private double rangesPerPart = 1;
+
+            public void partition(IFrameTupleAccessor accessor, int tIndex, int nParts, IGrowableIntArray map)
+                    throws HyracksDataException {
                 if (nParts == 1) {
-                    return 0;
+                    map.add(0);
+                    return;
                 }
-                int slotIndex = getRangePartition(accessor, tIndex);
                 // Map range partition to node partitions.
-                double rangesPerPart = 1;
-                if (rangeMap.getSplitCount() + 1 > nParts) {
-                    rangesPerPart = ((double) rangeMap.getSplitCount() + 1) / nParts;
+                if (partionCount != nParts) {
+                    partionCount = nParts;
+                    if (rangeMap.getSplitCount() + 1 > nParts) {
+                        rangesPerPart = ((double) rangeMap.getSplitCount() + 1) / nParts;
+                    }
                 }
-                return (int) Math.floor(slotIndex / rangesPerPart);
+                getRangePartitions(accessor, tIndex, map);
             }
 
             /*
-             * Determine the range partition.
+             * Determine the range partitions.
              */
-            public int getRangePartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                int slotIndex = 0;
-                for (int i = 0; i < rangeMap.getSplitCount(); ++i) {
-                    int c = compareSlotAndFields(accessor, tIndex, i);
-                    if (c < 0) {
-                        return slotIndex;
+            private void getRangePartitions(IFrameTupleAccessor accessor, int tIndex, IGrowableIntArray map)
+                    throws HyracksDataException {
+                switch (rangeType) {
+                    case PROJECT: {
+                        int minPartition = getPartitionMap(binarySearchRangePartition(accessor, tIndex, minComparators));
+                        addPartition(minPartition, map);
+                        break;
                     }
-                    slotIndex++;
+                    case PROJECT_END: {
+                        int maxPartition = getPartitionMap(
+                                binarySearchRangePartition(accessor, tIndex, maxComparators));
+                        addPartition(maxPartition, map);
+                        break;
+                    }
+                    case REPLICATE: {
+                        int minPartition = getPartitionMap(binarySearchRangePartition(accessor, tIndex, minComparators));
+                        int maxPartition = getPartitionMap(rangeMap.getSplitCount() + 1);
+                        for (int pid = minPartition; pid < maxPartition; ++pid) {
+                            addPartition(pid, map);
+                        }
+                        break;
+                    }
+                    case SPLIT: {
+                        int minPartition = getPartitionMap(binarySearchRangePartition(accessor, tIndex, minComparators));
+                        int maxPartition = getPartitionMap(
+                                binarySearchRangePartition(accessor, tIndex, maxComparators));
+                        for (int pid = minPartition; pid <= maxPartition; ++pid) {
+                            addPartition(pid, map);
+                        }
+                        break;
+                    }
                 }
-                return slotIndex;
             }
 
-            public int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int fieldIndex)
-                    throws HyracksDataException {
+            private void addPartition(int partition, IGrowableIntArray map) {
+                if (!hasPartition(partition, map)) {
+                    map.add(partition);
+                }
+            }
+
+            private int getPartitionMap(int partition) {
+                return (int) Math.floor(partition / rangesPerPart);
+            }
+
+            private boolean hasPartition(int pid, IGrowableIntArray map) {
+                for (int i = 0; i < map.size(); ++i) {
+                    if (map.get(i) == pid) {
+                        return true;
+                    }
+                }
+                return false;
+            }
+
+            /*
+             * Return first match or suggested index.
+             */
+            private int binarySearchRangePartition(IFrameTupleAccessor accessor, int tIndex,
+                    IBinaryComparator[] comparators) throws HyracksDataException {
+                int searchIndex = 0;
+                int left = 0;
+                int right = rangeMap.getSplitCount() - 1;
+                int cmp = 0;
+                while (left <= right) {
+                    searchIndex = (left + right) / 2;
+                    cmp = compareSlotAndFields(accessor, tIndex, searchIndex, comparators);
+                    if (cmp > 0) {
+                        left = searchIndex + 1;
+                        searchIndex += 1;
+                    } else if (cmp < 0) {
+                        right = searchIndex - 1;
+                    } else {
+                        return searchIndex + 1;
+                    }
+                }
+                return searchIndex;
+            }
+
+            private int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int mapIndex,
+                    IBinaryComparator[] comparators) throws HyracksDataException {
                 int c = 0;
                 int startOffset = accessor.getTupleStartOffset(tIndex);
                 int slotLength = accessor.getFieldSlotsLength();
@@ -86,9 +160,9 @@
                     int fIdx = rangeFields[f];
                     int fStart = accessor.getFieldStartOffset(tIndex, fIdx);
                     int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
-                    c = comparators[f].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart, fEnd
-                            - fStart, rangeMap.getByteArray(fieldIndex, f), rangeMap.getStartOffset(fieldIndex, f),
-                            rangeMap.getLength(fieldIndex, f));
+                    c = comparators[f].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart,
+                            fEnd - fStart, rangeMap.getByteArray(f, mapIndex), rangeMap.getStartOffset(f, mapIndex),
+                            rangeMap.getLength(f, mapIndex));
                     if (c != 0) {
                         return c;
                     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
index 5c5f34b..ff2e40b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
@@ -18,10 +18,7 @@
  */
 package org.apache.hyracks.dataflow.common.data.partition.range;
 
-import org.apache.hyracks.data.std.api.IPointable;
-
 public interface IRangeMap {
-    public IPointable getFieldSplit(int columnIndex, int splitIndex);
 
     public int getSplitCount();
 
@@ -32,4 +29,22 @@
     public int getLength(int columnIndex, int splitIndex);
 
     public int getTag(int columnIndex, int splitIndex);
+
+    // Min value functions
+    public byte[] getMinByteArray(int columnIndex);
+
+    public int getMinStartOffset(int columnIndex);
+
+    public int getMinLength(int columnIndex);
+
+    public int getMinTag(int columnIndex);
+
+    // Max value functions
+    public byte[] getMaxByteArray(int columnIndex);
+
+    public int getMaxStartOffset(int columnIndex);
+
+    public int getMaxLength(int columnIndex);
+
+    public int getMaxTag(int columnIndex);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java
new file mode 100644
index 0000000..dcde70b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+public interface IRangePartitionType {
+    public enum RangePartitioningType {
+        /**
+         * Partitioning is determined by finding the range partition where the first data point lies.
+         */
+        PROJECT,
+        /**
+         * Partitioning is determined by finding the range partition where the last data point lies.
+         */
+        PROJECT_END,
+        /**
+         * Partitioning is determined by finding all the range partitions where the data has a point.
+         */
+        SPLIT,
+        /**
+         * Partitioning is determined by finding all the range partitions where the data has a point
+         * or comes after the data point.
+         */
+        REPLICATE
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
index 98acbc0..00fb86d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
@@ -20,14 +20,15 @@
 
 import java.io.Serializable;
 
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-
 /**
  * The range map stores the field split values in an byte array.
- * The first split value for each field followed by the second split value for each field, etc.
+ * The first and last split values for each column represent the min and max values (not actually split values).
+ * <br />
+ * Example for columns A and B with three split values.
+ * {min A, min B, split 1 A, split 1 B, split 2 A, split 2 B, split 3 A, split 3 B, max A, max B}
  */
 public class RangeMap implements IRangeMap, Serializable {
+    private static final long serialVersionUID = 1L;
     private final int fields;
     private final byte[] bytes;
     private final int[] offsets;
@@ -39,16 +40,8 @@
     }
 
     @Override
-    public IPointable getFieldSplit(int columnIndex, int splitIndex) {
-        IPointable p = VoidPointable.FACTORY.createPointable();
-        int index = getFieldIndex(columnIndex, splitIndex);
-        p.set(bytes, getFieldStart(index), getFieldLength(index));
-        return p;
-    }
-
-    @Override
     public int getSplitCount() {
-        return offsets.length / fields;
+        return offsets.length / fields - 2;
     }
 
     @Override
@@ -58,21 +51,21 @@
 
     @Override
     public int getTag(int columnIndex, int splitIndex) {
-        return getFieldTag(getFieldIndex(columnIndex, splitIndex));
+        return getFieldTag(getFieldIndex(columnIndex, splitIndex + 1));
     }
 
     @Override
     public int getStartOffset(int columnIndex, int splitIndex) {
-        return getFieldStart(getFieldIndex(columnIndex, splitIndex));
+        return getFieldStart(getFieldIndex(columnIndex, splitIndex + 1));
     }
 
     @Override
     public int getLength(int columnIndex, int splitIndex) {
-        return getFieldLength(getFieldIndex(columnIndex, splitIndex));
+        return getFieldLength(getFieldIndex(columnIndex, splitIndex + 1));
     }
 
     private int getFieldIndex(int columnIndex, int splitIndex) {
-        return splitIndex * fields + columnIndex;
+        return columnIndex + splitIndex * fields;
     }
 
     private int getFieldTag(int index) {
@@ -95,4 +88,52 @@
         return length;
     }
 
+    @Override
+    public byte[] getMinByteArray(int columnIndex) {
+        return bytes;
+    }
+
+    @Override
+    public int getMinStartOffset(int columnIndex) {
+        return getFieldStart(getFieldIndex(columnIndex, getMinIndex()));
+    }
+
+    @Override
+    public int getMinLength(int columnIndex) {
+        return getFieldLength(getFieldIndex(columnIndex, getMinIndex()));
+    }
+
+    @Override
+    public int getMinTag(int columnIndex) {
+        return getFieldTag(getFieldIndex(columnIndex, getMinIndex()));
+    }
+
+    @Override
+    public byte[] getMaxByteArray(int columnIndex) {
+        return bytes;
+    }
+
+    @Override
+    public int getMaxStartOffset(int columnIndex) {
+        return getFieldStart(getFieldIndex(columnIndex, getMaxIndex()));
+    }
+
+    @Override
+    public int getMaxLength(int columnIndex) {
+        return getFieldLength(getFieldIndex(columnIndex, getMaxIndex()));
+    }
+
+    @Override
+    public int getMaxTag(int columnIndex) {
+        return getFieldTag(getFieldIndex(columnIndex, getMaxIndex()));
+    }
+
+    private int getMaxIndex() {
+        return offsets.length / fields - 1;
+    }
+
+    private int getMinIndex() {
+        return 0;
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
index e7e511e..a119a63 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
@@ -58,6 +58,13 @@
             <scope>compile</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-storage-common</artifactId>
+            <version>0.2.18-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/AbstractPartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/AbstractPartitionDataWriter.java
new file mode 100644
index 0000000..b9964a8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/AbstractPartitionDataWriter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+public abstract class AbstractPartitionDataWriter implements IFrameWriter {
+    protected final int consumerPartitionCount;
+    protected final IFrameWriter[] pWriters;
+    protected final FrameTupleAppender[] appenders;
+    protected final FrameTupleAccessor tupleAccessor;
+    protected final IHyracksTaskContext ctx;
+    protected boolean allocatedFrame = false;
+
+    public AbstractPartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount,
+            IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor) throws HyracksDataException {
+        this.consumerPartitionCount = consumerPartitionCount;
+        pWriters = new IFrameWriter[consumerPartitionCount];
+        appenders = new FrameTupleAppender[consumerPartitionCount];
+        for (int i = 0; i < consumerPartitionCount; ++i) {
+            try {
+                pWriters[i] = pwFactory.createFrameWriter(i);
+                appenders[i] = new FrameTupleAppender();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
+        this.ctx = ctx;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        for (int i = 0; i < pWriters.length; ++i) {
+            if (allocatedFrame) {
+                appenders[i].write(pWriters[i], true);
+            }
+            pWriters[i].close();
+        }
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        for (int i = 0; i < pWriters.length; ++i) {
+            pWriters[i].open();
+        }
+    }
+
+    @Override
+    abstract public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
+
+    protected void allocateFrames() throws HyracksDataException {
+        for (int i = 0; i < appenders.length; ++i) {
+            appenders[i].reset(new VSizeFrame(ctx), true);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        for (int i = 0; i < appenders.length; ++i) {
+            pWriters[i].fail();
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
new file mode 100644
index 0000000..d167543
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionCollector;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
+import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
+
+public class MToNBroadcastConnectorDescriptor extends AbstractMToNConnectorDescriptor {
+    public MToNBroadcastConnectorDescriptor(IConnectorDescriptorRegistry spec) {
+        super(spec);
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+                    throws HyracksDataException {
+        final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
+        for (int i = 0; i < nConsumerPartitions; ++i) {
+            epWriters[i] = edwFactory.createFrameWriter(i);
+        }
+        return new IFrameWriter() {
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                buffer.mark();
+                for (int i = 0; i < epWriters.length; ++i) {
+                    if (i != 0) {
+                        buffer.reset();
+                    }
+                    epWriters[i].nextFrame(buffer);
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                for (int i = 0; i < epWriters.length; ++i) {
+                    epWriters[i].fail();
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                for (int i = 0; i < epWriters.length; ++i) {
+                    epWriters[i].close();
+                }
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+                for (int i = 0; i < epWriters.length; ++i) {
+                    epWriters[i].open();
+                }
+            }
+        };
+    }
+
+    @Override
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+        BitSet expectedPartitions = new BitSet(nProducerPartitions);
+        expectedPartitions.set(0, nProducerPartitions);
+        NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
+                expectedPartitions);
+        NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
+        return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java
new file mode 100644
index 0000000..068d11a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrameReader;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionCollector;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.collectors.IPartitionBatchManager;
+import org.apache.hyracks.dataflow.std.collectors.NonDeterministicPartitionBatchManager;
+import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
+import org.apache.hyracks.dataflow.std.collectors.SortMergeFrameReader;
+
+public class MToNRangePartitionMergingConnectorDescriptor extends AbstractMToNConnectorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private final ITupleRangePartitionComputerFactory tprcf;
+    private final int[] sortFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final INormalizedKeyComputerFactory nkcFactory;
+
+    public MToNRangePartitionMergingConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITupleRangePartitionComputerFactory tprcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory nkcFactory) {
+        super(spec);
+        this.tprcf = tprcf;
+        this.sortFields = sortFields;
+        this.comparatorFactories = comparatorFactories;
+        this.nkcFactory = nkcFactory;
+    }
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
+        final PartitionRangeDataWriter rangeWriter = new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory,
+                recordDesc, tprcf.createPartitioner());
+        return rangeWriter;
+    }
+
+    @Override
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+        IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        INormalizedKeyComputer nmkComputer = nkcFactory == null ? null : nkcFactory.createNormalizedKeyComputer();
+        IPartitionBatchManager pbm = new NonDeterministicPartitionBatchManager(nProducerPartitions);
+        IFrameReader sortMergeFrameReader = new SortMergeFrameReader(ctx, nProducerPartitions, nProducerPartitions,
+                sortFields, comparators, nmkComputer, recordDesc, pbm);
+        BitSet expectedPartitions = new BitSet();
+        expectedPartitions.set(0, nProducerPartitions);
+        return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, sortMergeFrameReader, pbm);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
new file mode 100644
index 0000000..2338993
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrameReader;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionCollector;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.collectors.IPartitionBatchManager;
+import org.apache.hyracks.dataflow.std.collectors.NonDeterministicPartitionBatchManager;
+import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
+import org.apache.hyracks.dataflow.std.collectors.SortMergeFrameReader;
+
+public class MToNRangePartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private final ITupleRangePartitionComputerFactory trpcf;
+    private final int[] sortFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final INormalizedKeyComputerFactory nkcFactory;
+
+    public MToNRangePartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITupleRangePartitionComputerFactory trpcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory nkcFactory) {
+        super(spec);
+        this.trpcf = trpcf;
+        this.sortFields = sortFields;
+        this.comparatorFactories = comparatorFactories;
+        this.nkcFactory = nkcFactory;
+    }
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
+        final PartitionRangeDataWriter rangeWriter = new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory,
+                recordDesc, trpcf.createPartitioner());
+        return rangeWriter;
+    }
+
+    @Override
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+        IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        INormalizedKeyComputer nmkComputer = nkcFactory == null ? null : nkcFactory.createNormalizedKeyComputer();
+        IPartitionBatchManager pbm = new NonDeterministicPartitionBatchManager(nProducerPartitions);
+        IFrameReader sortMergeFrameReader = new SortMergeFrameReader(ctx, nProducerPartitions, nProducerPartitions,
+                sortFields, comparators, nmkComputer, recordDesc, pbm);
+        BitSet expectedPartitions = new BitSet();
+        expectedPartitions.set(0, nProducerPartitions);
+        return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, sortMergeFrameReader, pbm);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
new file mode 100644
index 0000000..f8240b8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.storage.IGrowableIntArray;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.storage.common.arraylist.IntArrayList;
+
+public class PartitionRangeDataWriter extends AbstractPartitionDataWriter {
+    private final ITupleRangePartitionComputer tpc;
+    private final IGrowableIntArray map;
+
+    public PartitionRangeDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount,
+            IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITupleRangePartitionComputer tpc)
+            throws HyracksDataException {
+        super(ctx, consumerPartitionCount, pwFactory, recordDescriptor);
+        this.tpc = tpc;
+        this.map = new IntArrayList(8, 8);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        if (!allocatedFrame) {
+            allocateFrames();
+            allocatedFrame = true;
+        }
+        tupleAccessor.reset(buffer);
+        int tupleCount = tupleAccessor.getTupleCount();
+        for (int i = 0; i < tupleCount; ++i) {
+            tpc.partition(tupleAccessor, i, consumerPartitionCount, map);
+            for (int h = 0; h < map.size(); ++h) {
+                FrameUtils.appendToWriter(pWriters[map.get(h)], appenders[map.get(h)], tupleAccessor, i);
+            }
+            map.clear();
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
index c425c52..b8ad974 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -17,101 +17,109 @@
  ! under the License.
  !-->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<groupId>org.apache.hyracks.examples</groupId>
-	<artifactId>hyracks-integration-tests</artifactId>
-	<name>hyracks-integration-tests</name>
-	<parent>
-		<groupId>org.apache.hyracks</groupId>
-		<artifactId>hyracks-examples</artifactId>
-		<version>0.2.18-SNAPSHOT</version>
-	</parent>
+<project
+    xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.hyracks.examples</groupId>
+    <artifactId>hyracks-integration-tests</artifactId>
+    <name>hyracks-integration-tests</name>
+    <parent>
+        <groupId>org.apache.hyracks</groupId>
+        <artifactId>hyracks-examples</artifactId>
+        <version>0.2.18-SNAPSHOT</version>
+    </parent>
 
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-deploy-plugin</artifactId>
-				 <configuration>
-					<skip>true</skip>
-				 </configuration>
-				</plugin>
-		</plugins>
-	</build>
-	<properties>
-			<root.dir>${basedir}/../../..</root.dir>
-	</properties>
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.hyracks</groupId>
-			<artifactId>hyracks-dataflow-std</artifactId>
-			<version>0.2.18-SNAPSHOT</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hyracks</groupId>
-			<artifactId>hyracks-control-cc</artifactId>
-			<version>0.2.18-SNAPSHOT</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hyracks</groupId>
-			<artifactId>hyracks-control-nc</artifactId>
-			<version>0.2.18-SNAPSHOT</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hyracks</groupId>
-			<artifactId>hyracks-storage-am-btree</artifactId>
-			<version>0.2.18-SNAPSHOT</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hyracks</groupId>
-			<artifactId>hyracks-storage-am-rtree</artifactId>
-			<version>0.2.18-SNAPSHOT</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hyracks</groupId>
-			<artifactId>hyracks-storage-am-lsm-btree</artifactId>
-			<version>0.2.18-SNAPSHOT</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hyracks</groupId>
-			<artifactId>hyracks-storage-am-lsm-rtree</artifactId>
-			<version>0.2.18-SNAPSHOT</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hyracks</groupId>
-			<artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
-			<version>0.2.18-SNAPSHOT</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hyracks</groupId>
-			<artifactId>hyracks-test-support</artifactId>
-			<version>0.2.18-SNAPSHOT</version>
-			<type>jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hyracks</groupId>
-			<artifactId>hyracks-data-std</artifactId>
-			<version>0.2.18-SNAPSHOT</version>
-		</dependency>
-  	    <dependency>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <properties>
+        <root.dir>${basedir}/../../..</root.dir>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-dataflow-std</artifactId>
+            <version>0.2.18-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-control-cc</artifactId>
+            <version>0.2.18-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-control-nc</artifactId>
+            <version>0.2.18-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-storage-am-btree</artifactId>
+            <version>0.2.18-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-storage-am-rtree</artifactId>
+            <version>0.2.18-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-storage-am-lsm-btree</artifactId>
+            <version>0.2.18-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-storage-am-lsm-rtree</artifactId>
+            <version>0.2.18-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
+            <version>0.2.18-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-test-support</artifactId>
+            <version>0.2.18-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-data-std</artifactId>
+            <version>0.2.18-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-dataflow-common</artifactId>
+            <version>0.2.18-SNAPSHOT</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hyracks</groupId>
             <artifactId>hyracks-client</artifactId>
             <version>0.2.18-SNAPSHOT</version>
@@ -124,5 +132,5 @@
             <version>1.2.2</version>
             <scope>test</scope>
         </dependency>
-	</dependencies>
+    </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
index da674a4..d12d534 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
@@ -42,7 +42,7 @@
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToNBroadcastConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
@@ -124,7 +124,7 @@
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn4, sorter2, 0, group2, 0);
 
-        IConnectorDescriptor conn5 = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor conn5 = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(conn5, group2, 0, printer, 0);
 
         spec.addRoot(printer);
@@ -197,7 +197,7 @@
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn4, sorter2, 0, group2, 0);
 
-        IConnectorDescriptor conn5 = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor conn5 = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(conn5, group2, 0, printer, 0);
 
         spec.addRoot(printer);
@@ -270,7 +270,7 @@
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn4, sorter2, 0, group2, 0);
 
-        IConnectorDescriptor conn5 = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor conn5 = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(conn5, group2, 0, printer, 0);
 
         spec.addRoot(printer);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 8232a62..570c82c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -41,7 +41,7 @@
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToNBroadcastConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
@@ -655,7 +655,7 @@
                                 .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
-        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(joinPrinterConn, join, 0, printer, 0);
 
         spec.addRoot(printer);
@@ -746,7 +746,7 @@
                                 .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
-        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(joinPrinterConn, join, 0, printer, 0);
 
         spec.addRoot(printer);
@@ -837,7 +837,7 @@
                                 .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
-        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(joinPrinterConn, join, 0, printer, 0);
 
         spec.addRoot(printer);
@@ -924,7 +924,7 @@
                                 .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
-        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(joinPrinterConn, join, 0, printer, 0);
 
         spec.addRoot(printer);
@@ -1023,7 +1023,7 @@
         IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
         spec.connect(custJoinConn, custMat, 0, join, 1);
 
-        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(joinPrinterConn, join, 0, printer, 0);
 
         spec.addRoot(printer);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index c28e496..73c1123 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -43,7 +43,7 @@
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToNBroadcastConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
@@ -189,7 +189,7 @@
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
-        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor custJoinConn = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
@@ -266,10 +266,10 @@
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
-        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor custJoinConn = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
-        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(joinPrinterConn, join, 0, printer, 0);
 
         spec.addRoot(printer);
@@ -343,10 +343,10 @@
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
-        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor custJoinConn = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
-        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(joinPrinterConn, join, 0, printer, 0);
 
         spec.addRoot(printer);
@@ -425,10 +425,10 @@
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
-        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor custJoinConn = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
-        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec);
         spec.connect(joinPrinterConn, join, 0, printer, 0);
 
         spec.addRoot(printer);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java
new file mode 100644
index 0000000..2cf166b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.storage.IGrowableIntArray;
+import org.apache.hyracks.data.std.accessors.PointableBinaryRangeAscComparatorFactory;
+import org.apache.hyracks.data.std.accessors.PointableBinaryRangeDescComparatorFactory;
+import org.apache.hyracks.data.std.api.IComparable;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IPointableFactory;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
+import org.apache.hyracks.storage.common.arraylist.IntArrayList;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class FieldRangePartitionComputerFactoryTest extends TestCase {
+
+    private final Integer64SerializerDeserializer int64Serde = Integer64SerializerDeserializer.INSTANCE;
+    @SuppressWarnings("rawtypes")
+    private final ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] {
+            Integer64SerializerDeserializer.INSTANCE };
+    private final RecordDescriptor RecordDesc = new RecordDescriptor(SerDers);
+
+    IBinaryRangeComparatorFactory[] BINARY_ASC_COMPARATOR_FACTORIES = new IBinaryRangeComparatorFactory[] {
+            new PointableBinaryRangeAscComparatorFactory(LongPointable.FACTORY) };
+    IBinaryRangeComparatorFactory[] BINARY_DESC_COMPARATOR_FACTORIES = new IBinaryRangeComparatorFactory[] {
+            new PointableBinaryRangeDescComparatorFactory(LongPointable.FACTORY) };
+    IBinaryRangeComparatorFactory[] BINARY_REPLICATE_COMPARATOR_FACTORIES = new IBinaryRangeComparatorFactory[] {
+            new PointableBinaryReplicateRangeComparatorFactory(LongPointable.FACTORY) };
+    /*
+     * The following points (X) will be tested for these 4 partitions.
+     *
+     * X-------X----XXX----X----XXX----X----XXX----X-------X
+     *    -----------|-----------|-----------|-----------
+     *
+     * The following points (X) will be tested for these 16 partitions.
+     *
+     * X-------X----XXX----X----XXX----X----XXX----X-------X
+     *    --|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--
+     */
+
+    private final int FRAME_SIZE = 320;
+    private final int INTEGER_LENGTH = Long.BYTES;
+
+    //result index {      0,   1,   2,   3,    4,    5,    6,    7,    8,    9,   10,   11,   12,   13,   14,   15   };
+    //points       {     20l, 45l, 70l, 95l, 120l, 145l, 170l, 195l, 220l, 245l, 270l, 295l, 320l, 345l, 370l, 395l  };
+    private final Long[] EACH_PARTITION = new Long[] { 20l, 45l, 70l, 95l, 120l, 145l, 170l, 195l, 220l, 245l, 270l,
+            295l, 320l, 345l, 370l, 395l };
+
+    //result index {      0,   1,   2,   3,    4,    5,    6,    7,    8,    9,    10,   11,   12,   13,   14        };
+    //points       {    -25l, 50l, 99l, 100l, 101l, 150l, 199l, 200l, 201l, 250l, 299l, 300l, 301l, 350l, 425l       };
+    private final Long[] PARTITION_BOUNDRIES = new Long[] { -25l, 50l, 99l, 100l, 101l, 150l, 199l, 200l, 201l, 250l,
+            299l, 300l, 301l, 350l, 425l };
+
+    //map          { 0l, 25l, 50l, 75l, 100l, 125l, 150l, 175l, 200l, 225l, 250l, 275l, 300l, 325l, 350l, 375l, 400l };
+    //partitions   {    0,   1,   2,   3,    4,    5,    6,    7,    8,    9,    10,   11,   12,   13,   14,  15     };
+    private final Long[] MAP_POINTS = new Long[] { 0l, 25l, 50l, 75l, 100l, 125l, 150l, 175l, 200l, 225l, 250l, 275l,
+            300l, 325l, 350l, 375l, 400l };
+
+    private byte[] getIntegerBytes(Long[] integers) throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutput dos = new DataOutputStream(bos);
+            for (int i = 0; i < integers.length; ++i) {
+                int64Serde.serialize(integers[i], dos);
+            }
+            bos.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private IRangeMap getRangeMap(Long[] integers) throws HyracksDataException {
+        int offsets[] = new int[integers.length];
+        for (int i = 0; i < integers.length; ++i) {
+            offsets[i] = (i + 1) * INTEGER_LENGTH;
+        }
+        return new RangeMap(1, getIntegerBytes(integers), offsets);
+    }
+
+    private ByteBuffer prepareData(IHyracksTaskContext ctx, Long[] integers) throws HyracksDataException {
+        IFrame frame = new VSizeFrame(ctx);
+        FrameFixedFieldTupleAppender fffta = new FrameFixedFieldTupleAppender(RecordDesc.getFieldCount());
+        fffta.reset(frame, true);
+
+        byte[] serializedIntegers = getIntegerBytes(integers);
+        for (int i = 0; i < integers.length; ++i) {
+            fffta.appendField(serializedIntegers, i * INTEGER_LENGTH, INTEGER_LENGTH);
+        }
+
+        return frame.getBuffer();
+    }
+
+    private void executeFieldRangePartitionTests(Long[] integers, IRangeMap rangeMap,
+            IBinaryRangeComparatorFactory[] comparatorFactories, RangePartitioningType rangeType, int nParts,
+            int[][] results) throws HyracksDataException {
+        IHyracksTaskContext ctx = TestUtils.create(FRAME_SIZE);
+        int[] rangeFields = new int[] { 0 };
+        ITupleRangePartitionComputerFactory frpcf = new FieldRangePartitionComputerFactory(rangeFields,
+                comparatorFactories, rangeMap, rangeType);
+        ITupleRangePartitionComputer partitioner = frpcf.createPartitioner();
+
+        IFrameTupleAccessor accessor = new FrameTupleAccessor(RecordDesc);
+        ByteBuffer buffer = prepareData(ctx, integers);
+        accessor.reset(buffer);
+
+        IGrowableIntArray map = new IntArrayList(16, 1);
+
+        for (int i = 0; i < results.length; ++i) {
+            map.clear();
+            partitioner.partition(accessor, i, nParts, map);
+            checkPartitionResult(integers[i], results[i], map);
+        }
+    }
+
+    private String getString(int[] results) {
+        String result = "[";
+        for (int i = 0; i < results.length; ++i) {
+            result += results[i];
+            if (i < results.length - 1) {
+                result += ", ";
+            }
+        }
+        result += "]";
+        return result;
+    }
+
+    private String getString(IGrowableIntArray results) {
+        String result = "[";
+        for (int i = 0; i < results.size(); ++i) {
+            result += results.get(i);
+            if (i < results.size() - 1) {
+                result += ", ";
+            }
+        }
+        result += "]";
+        return result;
+    }
+
+    private void checkPartitionResult(Long value, int[] results, IGrowableIntArray map) {
+        if (results.length != map.size()) {
+            Assert.assertEquals("The partition for value (" + value + ") gives different number of partitions",
+                    results.length, map.size());
+        }
+        for (int i = 0; i < results.length; ++i) {
+            boolean match = false;
+            for (int j = 0; j < results.length; ++j) {
+                if (results[i] == map.get(j)) {
+                    match = true;
+                    continue;
+                }
+            }
+            if (!match) {
+                Assert.assertEquals("Individual partitions for " + value + " do not match", getString(results),
+                        getString(map));
+                return;
+            }
+        }
+    }
+
+    @Test
+    public void testFieldRangePartitionAscProject4AllPartitions() throws HyracksDataException {
+        int[][] results = new int[16][];
+        results[0] = new int[] { 0 };
+        results[1] = new int[] { 0 };
+        results[2] = new int[] { 0 };
+        results[3] = new int[] { 0 };
+        results[4] = new int[] { 1 };
+        results[5] = new int[] { 1 };
+        results[6] = new int[] { 1 };
+        results[7] = new int[] { 1 };
+        results[8] = new int[] { 2 };
+        results[9] = new int[] { 2 };
+        results[10] = new int[] { 2 };
+        results[11] = new int[] { 2 };
+        results[12] = new int[] { 3 };
+        results[13] = new int[] { 3 };
+        results[14] = new int[] { 3 };
+        results[15] = new int[] { 3 };
+
+        IRangeMap rangeMap = getRangeMap(MAP_POINTS);
+
+        executeFieldRangePartitionTests(EACH_PARTITION, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES,
+                RangePartitioningType.PROJECT, 4, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionDescProject4AllPartitions() throws HyracksDataException {
+        int[][] results = new int[16][];
+        results[0] = new int[] { 3 };
+        results[1] = new int[] { 3 };
+        results[2] = new int[] { 3 };
+        results[3] = new int[] { 3 };
+        results[4] = new int[] { 2 };
+        results[5] = new int[] { 2 };
+        results[6] = new int[] { 2 };
+        results[7] = new int[] { 2 };
+        results[8] = new int[] { 1 };
+        results[9] = new int[] { 1 };
+        results[10] = new int[] { 1 };
+        results[11] = new int[] { 1 };
+        results[12] = new int[] { 0 };
+        results[13] = new int[] { 0 };
+        results[14] = new int[] { 0 };
+        results[15] = new int[] { 0 };
+
+        Long[] map = MAP_POINTS.clone();
+        ArrayUtils.reverse(map);
+        IRangeMap rangeMap = getRangeMap(map);
+
+        executeFieldRangePartitionTests(EACH_PARTITION, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES,
+                RangePartitioningType.PROJECT, 4, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionAscProject16AllPartitions() throws HyracksDataException {
+        int[][] results = new int[16][];
+        results[0] = new int[] { 0 };
+        results[1] = new int[] { 1 };
+        results[2] = new int[] { 2 };
+        results[3] = new int[] { 3 };
+        results[4] = new int[] { 4 };
+        results[5] = new int[] { 5 };
+        results[6] = new int[] { 6 };
+        results[7] = new int[] { 7 };
+        results[8] = new int[] { 8 };
+        results[9] = new int[] { 9 };
+        results[10] = new int[] { 10 };
+        results[11] = new int[] { 11 };
+        results[12] = new int[] { 12 };
+        results[13] = new int[] { 13 };
+        results[14] = new int[] { 14 };
+        results[15] = new int[] { 15 };
+
+        IRangeMap rangeMap = getRangeMap(MAP_POINTS);
+
+        executeFieldRangePartitionTests(EACH_PARTITION, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES,
+                RangePartitioningType.PROJECT, 16, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionDescProject16AllPartitions() throws HyracksDataException {
+        int[][] results = new int[16][];
+        results[0] = new int[] { 15 };
+        results[1] = new int[] { 14 };
+        results[2] = new int[] { 13 };
+        results[3] = new int[] { 12 };
+        results[4] = new int[] { 11 };
+        results[5] = new int[] { 10 };
+        results[6] = new int[] { 9 };
+        results[7] = new int[] { 8 };
+        results[8] = new int[] { 7 };
+        results[9] = new int[] { 6 };
+        results[10] = new int[] { 5 };
+        results[11] = new int[] { 4 };
+        results[12] = new int[] { 3 };
+        results[13] = new int[] { 2 };
+        results[14] = new int[] { 1 };
+        results[15] = new int[] { 0 };
+
+        Long[] map = MAP_POINTS.clone();
+        ArrayUtils.reverse(map);
+        IRangeMap rangeMap = getRangeMap(map);
+
+        executeFieldRangePartitionTests(EACH_PARTITION, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES,
+                RangePartitioningType.PROJECT, 16, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionAscProject16Partitions() throws HyracksDataException {
+        int[][] results = new int[15][];
+        results[0] = new int[] { 0 };
+        results[1] = new int[] { 2 };
+        results[2] = new int[] { 3 };
+        results[3] = new int[] { 4 };
+        results[4] = new int[] { 4 };
+        results[5] = new int[] { 6 };
+        results[6] = new int[] { 7 };
+        results[7] = new int[] { 8 };
+        results[8] = new int[] { 8 };
+        results[9] = new int[] { 10 };
+        results[10] = new int[] { 11 };
+        results[11] = new int[] { 12 };
+        results[12] = new int[] { 12 };
+        results[13] = new int[] { 14 };
+        results[14] = new int[] { 15 };
+
+        IRangeMap rangeMap = getRangeMap(MAP_POINTS);
+
+        executeFieldRangePartitionTests(PARTITION_BOUNDRIES, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES,
+                RangePartitioningType.PROJECT, 16, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionAscProject4Partitions() throws HyracksDataException {
+        int[][] results = new int[15][];
+        results[0] = new int[] { 0 };
+        results[1] = new int[] { 0 };
+        results[2] = new int[] { 0 };
+        results[3] = new int[] { 1 };
+        results[4] = new int[] { 1 };
+        results[5] = new int[] { 1 };
+        results[6] = new int[] { 1 };
+        results[7] = new int[] { 2 };
+        results[8] = new int[] { 2 };
+        results[9] = new int[] { 2 };
+        results[10] = new int[] { 2 };
+        results[11] = new int[] { 3 };
+        results[12] = new int[] { 3 };
+        results[13] = new int[] { 3 };
+        results[14] = new int[] { 3 };
+
+        IRangeMap rangeMap = getRangeMap(MAP_POINTS);
+
+        executeFieldRangePartitionTests(PARTITION_BOUNDRIES, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES,
+                RangePartitioningType.PROJECT, 4, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionAscReplicate4Partitions() throws HyracksDataException {
+        int[][] results = new int[15][];
+        results[0] = new int[] { 0, 1, 2, 3 };
+        results[1] = new int[] { 0, 1, 2, 3 };
+        results[2] = new int[] { 0, 1, 2, 3 };
+        results[3] = new int[] { 1, 2, 3 };
+        results[4] = new int[] { 1, 2, 3 };
+        results[5] = new int[] { 1, 2, 3 };
+        results[6] = new int[] { 1, 2, 3 };
+        results[7] = new int[] { 2, 3 };
+        results[8] = new int[] { 2, 3 };
+        results[9] = new int[] { 2, 3 };
+        results[10] = new int[] { 2, 3 };
+        results[11] = new int[] { 3 };
+        results[12] = new int[] { 3 };
+        results[13] = new int[] { 3 };
+        results[14] = new int[] { 3 };
+
+        IRangeMap rangeMap = getRangeMap(MAP_POINTS);
+
+        executeFieldRangePartitionTests(PARTITION_BOUNDRIES, rangeMap, BINARY_REPLICATE_COMPARATOR_FACTORIES,
+                RangePartitioningType.REPLICATE, 4, results);
+    }
+
+    @Test
+    public void testFieldRangePartitionAscReplicate16Partitions() throws HyracksDataException {
+        int[][] results = new int[15][];
+        results[0] = new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+        results[1] = new int[] { 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+        results[2] = new int[] { 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+        results[3] = new int[] { 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+        results[4] = new int[] { 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+        results[5] = new int[] { 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+        results[6] = new int[] { 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+        results[7] = new int[] { 8, 9, 10, 11, 12, 13, 14, 15 };
+        results[8] = new int[] { 8, 9, 10, 11, 12, 13, 14, 15 };
+        results[9] = new int[] { 10, 11, 12, 13, 14, 15 };
+        results[10] = new int[] { 11, 12, 13, 14, 15 };
+        results[11] = new int[] { 12, 13, 14, 15 };
+        results[12] = new int[] { 12, 13, 14, 15 };
+        results[13] = new int[] { 14, 15 };
+        results[14] = new int[] { 15 };
+
+        IRangeMap rangeMap = getRangeMap(MAP_POINTS);
+
+        executeFieldRangePartitionTests(PARTITION_BOUNDRIES, rangeMap, BINARY_REPLICATE_COMPARATOR_FACTORIES,
+                RangePartitioningType.REPLICATE, 16, results);
+    }
+
+    private class PointableBinaryReplicateRangeComparatorFactory implements IBinaryRangeComparatorFactory {
+        private static final long serialVersionUID = 1L;
+
+        protected final IPointableFactory pf;
+
+        public PointableBinaryReplicateRangeComparatorFactory(IPointableFactory pf) {
+            this.pf = pf;
+        }
+
+        @Override
+        public IBinaryComparator createMinBinaryComparator() {
+            final IPointable p = pf.createPointable();
+            return new IBinaryComparator() {
+                @Override
+                public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                    if (l1 == 0 && l2 != 0)
+                        return -1;
+                    if (l1 != 0 && l2 == 0)
+                        return 1;
+                    p.set(b1, s1, l1);
+                    return ((IComparable) p).compareTo(b2, s2, l2);
+                }
+            };
+        }
+
+        @Override
+        public IBinaryComparator createMaxBinaryComparator() {
+            return new IBinaryComparator() {
+                @Override
+                public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                    return -1;
+                }
+            };
+        }
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java
index 8513368..3b2a943 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java
@@ -24,6 +24,7 @@
 
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.storage.IGrowableIntArray;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -40,9 +41,9 @@
 import org.apache.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.ophelpers.IntArrayList;
-import org.apache.hyracks.storage.am.common.ophelpers.LongArrayList;
 import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
+import org.apache.hyracks.storage.common.arraylist.IntArrayList;
+import org.apache.hyracks.storage.common.arraylist.LongArrayList;
 
 public class BTreeOpContext implements IIndexOperationContext {
     private final int INIT_ARRAYLIST_SIZE = 6;
@@ -60,8 +61,8 @@
     public RangePredicate pred;
     public BTreeSplitKey splitKey;
     public LongArrayList pageLsns;
-    public IntArrayList smPages;
-    public IntArrayList freePages;
+    public IGrowableIntArray smPages;
+    public IGrowableIntArray freePages;
     public int opRestarts = 0;
     public boolean exceptionHandled;
     public IModificationOperationCallback modificationCallback;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/TreeIndexBufferCacheWarmup.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/TreeIndexBufferCacheWarmup.java
index d9013d3..4c906fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/TreeIndexBufferCacheWarmup.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/TreeIndexBufferCacheWarmup.java
@@ -22,86 +22,83 @@
 import java.util.Random;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.util.MathUtil;
+import org.apache.hyracks.api.storage.IGrowableIntArray;
 import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
-import org.apache.hyracks.storage.am.common.ophelpers.IntArrayList;
+import org.apache.hyracks.storage.common.arraylist.IntArrayList;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.MathUtil;
 
 public class TreeIndexBufferCacheWarmup {
-	private final IBufferCache bufferCache;
-	private final IMetaDataPageManager freePageManager;
-	private final int fileId;
-	private final ArrayList<IntArrayList> pagesByLevel = new ArrayList<IntArrayList>();
-	private final Random rnd = new Random();
+    private final IBufferCache bufferCache;
+    private final IMetaDataPageManager metaDataPageManager;
+    private final int fileId;
+    private final ArrayList<IntArrayList> pagesByLevel = new ArrayList<IntArrayList>();
+    private final Random rnd = new Random();
 
-	public TreeIndexBufferCacheWarmup(IBufferCache bufferCache,
-			IMetaDataPageManager freePageManager, int fileId) {
-		this.bufferCache = bufferCache;
-		this.freePageManager = freePageManager;
-		this.fileId = fileId;
-	}
+    public TreeIndexBufferCacheWarmup(IBufferCache bufferCache, IMetaDataPageManager metaDataPageManager, int fileId) {
+        this.bufferCache = bufferCache;
+        this.metaDataPageManager = metaDataPageManager;
+        this.fileId = fileId;
+    }
 
-	public void warmup(ITreeIndexFrame frame,
-			ITreeIndexMetaDataFrame metaFrame, int[] warmupTreeLevels,
-			int[] warmupRepeats) throws HyracksDataException {
-		bufferCache.openFile(fileId);
+    public void warmup(ITreeIndexFrame frame, ITreeIndexMetaDataFrame metaFrame, int[] warmupTreeLevels,
+            int[] warmupRepeats) throws HyracksDataException {
+        bufferCache.openFile(fileId);
 
-		// scan entire file to determine pages in each level
-		int maxPageId = freePageManager.getMaxPage(metaFrame);
-		for (int pageId = 0; pageId <= maxPageId; pageId++) {
-			ICachedPage page = bufferCache.pin(
-					BufferedFileHandle.getDiskPageId(fileId, pageId), false);
-			page.acquireReadLatch();
-			try {
-				frame.setPage(page);
-				byte level = frame.getLevel();
-				while (level >= pagesByLevel.size()) {
-					pagesByLevel.add(new IntArrayList(100, 100));
-				}
-				if (level >= 0) {
-					// System.out.println("ADDING: " + level + " " + pageId);
-					pagesByLevel.get(level).add(pageId);
-				}
-			} finally {
-				page.releaseReadLatch();
-				bufferCache.unpin(page);
-			}
-		}
+        // scan entire file to determine pages in each level
+        int maxPageId = metaDataPageManager.getMaxPage(metaFrame);
+        for (int pageId = 0; pageId <= maxPageId; pageId++) {
+            ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+            page.acquireReadLatch();
+            try {
+                frame.setPage(page);
+                byte level = frame.getLevel();
+                while (level >= pagesByLevel.size()) {
+                    pagesByLevel.add(new IntArrayList(100, 100));
+                }
+                if (level >= 0) {
+                    // System.out.println("ADDING: " + level + " " + pageId);
+                    pagesByLevel.get(level).add(pageId);
+                }
+            } finally {
+                page.releaseReadLatch();
+                bufferCache.unpin(page);
+            }
+        }
 
-		// pin certain pages again to simulate frequent access
-		for (int i = 0; i < warmupTreeLevels.length; i++) {
-			if (warmupTreeLevels[i] < pagesByLevel.size()) {
-				int repeats = warmupRepeats[i];
-				IntArrayList pageIds = pagesByLevel.get(warmupTreeLevels[i]);
-				int[] remainingPageIds = new int[pageIds.size()];
-				for (int r = 0; r < repeats; r++) {
-					for (int j = 0; j < pageIds.size(); j++) {
-						remainingPageIds[j] = pageIds.get(j);
-					}
+        // pin certain pages again to simulate frequent access
+        for (int i = 0; i < warmupTreeLevels.length; i++) {
+            if (warmupTreeLevels[i] < pagesByLevel.size()) {
+                int repeats = warmupRepeats[i];
+                IGrowableIntArray pageIds = pagesByLevel.get(warmupTreeLevels[i]);
+                int[] remainingPageIds = new int[pageIds.size()];
+                for (int r = 0; r < repeats; r++) {
+                    for (int j = 0; j < pageIds.size(); j++) {
+                        remainingPageIds[j] = pageIds.get(j);
+                    }
 
-					int remainingLength = pageIds.size();
-					for (int j = 0; j < pageIds.size(); j++) {
-						int index = MathUtil.stripSignBit(rnd.nextInt()) % remainingLength;
-						int pageId = remainingPageIds[index];
+                    int remainingLength = pageIds.size();
+                    for (int j = 0; j < pageIds.size(); j++) {
+                        int index = MathUtil.stripSignBit(rnd.nextInt()) % remainingLength;
+                        int pageId = remainingPageIds[index];
 
-						// pin & latch then immediately unlatch & unpin
-						ICachedPage page = bufferCache.pin(BufferedFileHandle
-								.getDiskPageId(fileId, pageId), false);
-						page.acquireReadLatch();
-						page.releaseReadLatch();
-						bufferCache.unpin(page);
+                        // pin & latch then immediately unlatch & unpin
+                        ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+                        page.acquireReadLatch();
+                        page.releaseReadLatch();
+                        bufferCache.unpin(page);
 
-						remainingPageIds[index] = remainingPageIds[remainingLength - 1];
-						remainingLength--;
-					}
-				}
-			}
-		}
+                        remainingPageIds[index] = remainingPageIds[remainingLength - 1];
+                        remainingLength--;
+                    }
+                }
+            }
+        }
 
-		bufferCache.closeFile(fileId);
-	}
+        bufferCache.closeFile(fileId);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
index 5c2e95e..14ae226 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProvider;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
@@ -131,7 +130,8 @@
             if (c == 0) {
                 return i;
             } else {
-                int pageId = IntegerPointable.getInteger(frameTuple.getFieldData(cmp.getKeyFieldCount() - 1), getChildPointerOff(frameTuple));
+                int pageId = IntegerPointable.getInteger(frameTuple.getFieldData(cmp.getKeyFieldCount() - 1),
+                        getChildPointerOff(frameTuple));
                 traverseList.add(pageId, -1, parentIndex);
             }
         }
@@ -216,9 +216,9 @@
 
     protected int pointerCmp(ITupleReference tupleA, ITupleReference tupleB, MultiComparator cmp)
             throws HyracksDataException {
-        return childPtrCmp
-                .compare(tupleA.getFieldData(cmp.getKeyFieldCount() - 1), getChildPointerOff(tupleA), childPtrSize,
-                        tupleB.getFieldData(cmp.getKeyFieldCount() - 1), getChildPointerOff(tupleB), childPtrSize);
+        return childPtrCmp.compare(tupleA.getFieldData(cmp.getKeyFieldCount() - 1), getChildPointerOff(tupleA),
+                childPtrSize, tupleB.getFieldData(cmp.getKeyFieldCount() - 1), getChildPointerOff(tupleB),
+                childPtrSize);
     }
 
     public int getTupleSize(ITupleReference tuple) {
@@ -295,8 +295,9 @@
         for (int i = 0; i < tupleCount; i++) {
             int tupleOff = slotManager.getTupleOff(slotManager.getSlotOff(i));
             frameTuple.resetByTupleOffset(buf, tupleOff);
-            int intVal = IntegerPointable.getInteger(buf.array(), frameTuple.getFieldStart(frameTuple.getFieldCount() - 1)
-            + frameTuple.getFieldLength(frameTuple.getFieldCount() - 1));
+            int intVal = IntegerPointable.getInteger(buf.array(),
+                    frameTuple.getFieldStart(frameTuple.getFieldCount() - 1)
+                            + frameTuple.getFieldLength(frameTuple.getFieldCount() - 1));
             ret.add(intVal);
         }
         return ret;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/PathList.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/PathList.java
index 6d59351..9056848 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/PathList.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/PathList.java
@@ -19,8 +19,8 @@
 
 package org.apache.hyracks.storage.am.rtree.impls;
 
-import org.apache.hyracks.storage.am.common.ophelpers.IntArrayList;
-import org.apache.hyracks.storage.am.common.ophelpers.LongArrayList;
+import org.apache.hyracks.storage.common.arraylist.IntArrayList;
+import org.apache.hyracks.storage.common.arraylist.LongArrayList;
 
 public class PathList {
     private IntArrayList pageIds;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/HilbertDoubleComparator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/HilbertDoubleComparator.java
index cd380e1..3194674 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/HilbertDoubleComparator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/HilbertDoubleComparator.java
@@ -20,11 +20,10 @@
 
 import org.apache.hyracks.api.dataflow.value.ILinearizeComparator;
 import org.apache.hyracks.data.std.primitive.DoublePointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProvider;
-import org.apache.hyracks.storage.am.common.ophelpers.DoubleArrayList;
-import org.apache.hyracks.storage.am.common.ophelpers.IntArrayList;
 import org.apache.hyracks.storage.am.rtree.impls.DoublePrimitiveValueProviderFactory;
+import org.apache.hyracks.storage.common.arraylist.DoubleArrayList;
+import org.apache.hyracks.storage.common.arraylist.IntArrayList;
 
 /*
  * This compares two points based on the hilbert curve. Currently, it only supports
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/ZCurveDoubleComparator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/ZCurveDoubleComparator.java
index 0280ba5..30eb991 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/ZCurveDoubleComparator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/ZCurveDoubleComparator.java
@@ -20,10 +20,9 @@
 
 import org.apache.hyracks.api.dataflow.value.ILinearizeComparator;
 import org.apache.hyracks.data.std.primitive.DoublePointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProvider;
-import org.apache.hyracks.storage.am.common.ophelpers.DoubleArrayList;
 import org.apache.hyracks.storage.am.rtree.impls.DoublePrimitiveValueProviderFactory;
+import org.apache.hyracks.storage.common.arraylist.DoubleArrayList;
 
 /*
  * This compares two points based on the z curve. For doubles, we cannot use
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/ZCurveIntComparator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/ZCurveIntComparator.java
index 2f372e2..1edcd2a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/ZCurveIntComparator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/linearize/ZCurveIntComparator.java
@@ -21,7 +21,7 @@
 import org.apache.hyracks.api.dataflow.value.ILinearizeComparator;
 import org.apache.hyracks.data.std.primitive.DoublePointable;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.storage.am.common.ophelpers.DoubleArrayList;
+import org.apache.hyracks.storage.common.arraylist.DoubleArrayList;
 
 /*
  * This compares two points based on the z curve. For doubles, we cannot use
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/DoubleArrayList.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/arraylist/DoubleArrayList.java
similarity index 97%
rename from hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/DoubleArrayList.java
rename to hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/arraylist/DoubleArrayList.java
index 2a95f1e..fd2bdec 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/DoubleArrayList.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/arraylist/DoubleArrayList.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hyracks.storage.am.common.ophelpers;
+package org.apache.hyracks.storage.common.arraylist;
 
 public class DoubleArrayList {
     private double[] data;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IntArrayList.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/arraylist/IntArrayList.java
similarity index 93%
rename from hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IntArrayList.java
rename to hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/arraylist/IntArrayList.java
index 25ca93c..fad1f8e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IntArrayList.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/arraylist/IntArrayList.java
@@ -17,9 +17,11 @@
  * under the License.
  */
 
-package org.apache.hyracks.storage.am.common.ophelpers;
+package org.apache.hyracks.storage.common.arraylist;
 
-public class IntArrayList {
+import org.apache.hyracks.api.storage.IGrowableIntArray;
+
+public class IntArrayList implements IGrowableIntArray {
     private int[] data;
     private int size;
     private int first;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/LongArrayList.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/arraylist/LongArrayList.java
similarity index 97%
rename from hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/LongArrayList.java
rename to hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/arraylist/LongArrayList.java
index c15fe38..5a40e25 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/LongArrayList.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/arraylist/LongArrayList.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hyracks.storage.am.common.ophelpers;
+package org.apache.hyracks.storage.common.arraylist;
 
 public class LongArrayList {
 	private long[] data;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/803
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I551e3196d8a101cf94c084c14842aa1af11632ce
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Preston Carman <prestonc@apache.org>


Mime
View raw message