asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [02/11] incubator-asterixdb-hyracks git commit: Implemented the memory-bounded HashGroupby and HashJoin for BigObject
Date Fri, 26 Feb 2016 05:53:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
index a10513a..a4c87c8 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.tests.integration;
 
 import java.io.File;
+import java.util.Arrays;
 
 import org.junit.Test;
 
@@ -43,63 +44,73 @@ import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
 import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
 import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
 
 public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegrationTest {
-    private static final boolean DEBUG = false;
+
+    private static boolean DEBUG = false;
+
+    static RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
+
+    static RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer() });
+
+    static RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer() });
+
+    static IValueParserFactory[] custValueParserFactories = new IValueParserFactory[custDesc.getFieldCount()];
+    static IValueParserFactory[] orderValueParserFactories = new IValueParserFactory[ordersDesc.getFieldCount()];
+
+    static {
+        Arrays.fill(custValueParserFactories, UTF8StringParserFactory.INSTANCE);
+        Arrays.fill(orderValueParserFactories, UTF8StringParserFactory.INSTANCE);
+    }
+
+    private IOperatorDescriptor getPrinter(JobSpecification spec, File file) {
+        IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(
+                new FileSplit[] {
+                        new FileSplit(NC1_ID, file.getAbsolutePath()) });
+
+        return DEBUG ? new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|")
+                : new NullSinkOperatorDescriptor(spec);
+    }
 
     @Test
     public void customerOrderCIDHybridHashJoin_Case1() throws Exception {
         JobSpecification spec = new JobSpecification();
-
         FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
                 "data/tpch0.001/customer4.tbl"))) };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
                 "data/tpch0.001/orders4.tbl"))) };
 
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
         OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 243,
@@ -107,13 +118,14 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
                 new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc, new JoinComparatorFactory(
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+                null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
-        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
-                : new NullSinkOperatorDescriptor(spec);
+        File file = File.createTempFile(getClass().getName(), "case1");
+        IOperatorDescriptor printer = getPrinter(spec, file);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -127,6 +139,7 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
 
         spec.addRoot(printer);
         runTest(spec);
+        System.out.println("output to " + file.getAbsolutePath());
     }
 
     @Test
@@ -136,48 +149,18 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
         FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
                 "data/tpch0.001/customer3.tbl"))) };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
                 "data/tpch0.001/orders4.tbl"))) };
 
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
         OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 122,
@@ -185,13 +168,14 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
                 new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc, new JoinComparatorFactory(
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+                null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
-        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
-                : new NullSinkOperatorDescriptor(spec);
+        File file = File.createTempFile(getClass().getName(), "case2");
+        IOperatorDescriptor printer = getPrinter(spec, file);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -205,6 +189,7 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
 
         spec.addRoot(printer);
         runTest(spec);
+        System.out.println("output to " + file.getAbsolutePath());
     }
 
     @Test
@@ -215,48 +200,18 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
         FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
                 "data/tpch0.001/customer3.tbl"))) };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
                 "data/tpch0.001/orders1.tbl"))) };
 
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+                new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
         OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 6, 122,
@@ -264,13 +219,14 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
                 new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc, new JoinComparatorFactory(
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+                null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
-        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
-                : new NullSinkOperatorDescriptor(spec);
+        File file = File.createTempFile(getClass().getName(), "case3");
+        IOperatorDescriptor printer = getPrinter(spec, file);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -284,6 +240,7 @@ public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegration
 
         spec.addRoot(printer);
         runTest(spec);
+        System.out.println("output to " + file.getAbsolutePath());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 2650799..8232a62 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -292,7 +292,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, null);
+                custOrderJoinDesc, null, false, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -815,7 +815,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, null);
+                custOrderJoinDesc, null, false, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java
index b774e0e..039497c 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java
@@ -87,7 +87,6 @@ public class VSizeFrameSortMergeTest extends AbstractIntegrationTest {
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
-        //                PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID );
 
         spec.setFrameSize(frameSize);
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, new int[] { 1, 0 },

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
new file mode 100644
index 0000000..b8ec790
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.AvgFieldGroupAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.AvgFieldMergeAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import org.apache.hyracks.test.support.TestUtils;
+
+public abstract class AbstractExternalGroupbyTest {
+
+    ISerializerDeserializer[] inFields = new ISerializerDeserializer[] {
+            IntegerSerializerDeserializer.INSTANCE,
+            new UTF8StringSerializerDeserializer(),
+    };
+
+    ISerializerDeserializer[] aggrFields = new ISerializerDeserializer[] {
+            new UTF8StringSerializerDeserializer(),  // key
+            IntegerSerializerDeserializer.INSTANCE,     // sum
+            IntegerSerializerDeserializer.INSTANCE,     // count
+            FloatSerializerDeserializer.INSTANCE,       // avg
+    };
+
+    RecordDescriptor inRecordDesc = new RecordDescriptor(inFields);
+
+    RecordDescriptor outputRec = new RecordDescriptor(aggrFields);
+
+    IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[] {
+            PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+
+    INormalizedKeyComputerFactory normalizedKeyComputerFactory = new UTF8StringNormalizedKeyComputerFactory();
+
+    IAggregatorDescriptorFactory partialAggrInPlace = new MultiFieldsAggregatorFactory(
+            new IFieldAggregateDescriptorFactory[] {
+                    new IntSumFieldAggregatorFactory(0, false),
+                    new CountFieldAggregatorFactory(false),
+                    new AvgFieldGroupAggregatorFactory(0, false) });
+
+    IAggregatorDescriptorFactory finalAggrInPlace = new MultiFieldsAggregatorFactory(
+            new IFieldAggregateDescriptorFactory[] {
+                    new IntSumFieldAggregatorFactory(1, false),
+                    new IntSumFieldAggregatorFactory(2, false),
+                    new AvgFieldMergeAggregatorFactory(3, false) });
+
+    IAggregatorDescriptorFactory partialAggrInState = new MultiFieldsAggregatorFactory(
+            new IFieldAggregateDescriptorFactory[] {
+                    new IntSumFieldAggregatorFactory(0, true),
+                    new CountFieldAggregatorFactory(true),
+                    new AvgFieldGroupAggregatorFactory(0, true) });
+
+    IAggregatorDescriptorFactory finalAggrInState = new MultiFieldsAggregatorFactory(
+            new IFieldAggregateDescriptorFactory[] {
+                    new IntSumFieldAggregatorFactory(1, true),
+                    new IntSumFieldAggregatorFactory(2, true),
+                    new AvgFieldMergeAggregatorFactory(3, true) });
+
+    int[] keyFields = new int[] { 1 };
+    int[] keyFieldsAfterPartial = new int[] { 0 };
+
+    class ResultValidateWriter implements IFrameWriter {
+
+        final Map<Integer, String> keyValueMap;
+        FrameTupleAccessor resultAccessor = new FrameTupleAccessor(outputRec);
+
+        class Result {
+            Result(int i) {
+                sum = i;
+                count = 1;
+            }
+
+            int sum;
+            int count;
+        }
+
+        private Map<String, Result> answer;
+
+        public ResultValidateWriter(Map<Integer, String> keyValueMap) {
+            this.keyValueMap = keyValueMap;
+            answer = new HashMap<>();
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            for (Map.Entry<Integer, String> keyValue : keyValueMap.entrySet()) {
+                Result result = answer.get(keyValue.getValue());
+                if (result == null) {
+                    answer.put(keyValue.getValue(), new Result(keyValue.getKey()));
+                } else {
+                    result.sum += keyValue.getKey();
+                    result.count++;
+                }
+            }
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            resultAccessor.reset(buffer);
+            ByteBufferInputStream bbis = new ByteBufferInputStream();
+            DataInputStream di = new DataInputStream(bbis);
+
+            Object[] outRecord = new Object[outputRec.getFieldCount()];
+
+            for (int tid = 0; tid < resultAccessor.getTupleCount(); tid++) {
+                for (int fid = 0; fid < outputRec.getFieldCount(); fid++) {
+                    bbis.setByteBuffer(resultAccessor.getBuffer(),
+                            resultAccessor.getAbsoluteFieldStartOffset(tid, fid));
+                    outRecord[fid] = outputRec.getFields()[fid].deserialize(di);
+                }
+                Result result = answer.remove((String) outRecord[0]);
+                assertNotNull(result);
+                assertEquals(result.sum, (int) outRecord[1]);
+                assertEquals(result.count, (int) outRecord[2]);
+            }
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            Assert.fail();
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            assertEquals(0, answer.size());
+        }
+    }
+
+    @Test
+    public void testBuildAndMergeNormalFrameInMem() throws HyracksDataException {
+        int tableSize = 1001;
+        int numFrames = 3;
+        int frameSize = 256;
+        int minDataSize = frameSize;
+        int minRecordSize = 20;
+        int maxRecordSize = 50;
+        testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize, null);
+    }
+
+    @Test
+    public void testBuildAndMergeNormalFrameSpill() throws HyracksDataException {
+        int tableSize = 1001;
+        int numFrames = 3;
+        int frameSize = 256;
+        int minDataSize = frameSize * 4;
+        int minRecordSize = 20;
+        int maxRecordSize = 50;
+        testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize, null);
+    }
+
+    @Test
+    public void testBuildAndMergeBigObj() throws HyracksDataException {
+        int tableSize = 1001;
+        int numFrames = 4;
+        int frameSize = 256;
+        int minDataSize = frameSize * 5;
+        int minRecordSize = 20;
+        int maxRecordSize = 50;
+        HashMap<Integer, String> bigRecords = AbstractRunGeneratorTest.generateBigObject(frameSize, 2);
+        testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize,
+                bigRecords);
+
+    }
+
+    protected abstract void initial(IHyracksTaskContext ctx, int tableSize, int numFrames) throws HyracksDataException;
+
+    protected abstract IFrameWriter getBuilder();
+
+    protected abstract IOperatorNodePushable getMerger();
+
+    private void testBuildAndMerge(int tableSize, int numFrames, int frameSize, int minDataSize,
+            int minRecordSize, int maxRecordSize,
+            Map<Integer, String> specialData)
+            throws HyracksDataException {
+
+        IHyracksTaskContext ctx = TestUtils.create(frameSize);
+        initial(ctx, tableSize, numFrames);
+        ArrayList<IFrame> input = new ArrayList<>();
+        Map<Integer, String> keyValueMap = new HashMap<>();
+        AbstractRunGeneratorTest
+                .prepareData(ctx, input, minDataSize, minRecordSize, maxRecordSize, specialData, keyValueMap);
+
+        ResultValidateWriter writer = new ResultValidateWriter(keyValueMap);
+
+        getBuilder().open();
+        for (IFrame frame : input) {
+            getBuilder().nextFrame(frame.getBuffer());
+        }
+        getBuilder().close();
+
+        getMerger().setOutputFrameWriter(0, writer, outputRec);
+        getMerger().initialize();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
index 3cc2a23..673c6fa 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
@@ -45,8 +45,8 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
 import org.apache.hyracks.dataflow.std.sort.util.GroupFrameAccessor;
 import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
 import org.apache.hyracks.test.support.TestUtils;
@@ -54,8 +54,8 @@ import org.junit.Test;
 
 public abstract class AbstractRunGeneratorTest {
     static TestUtils testUtils = new TestUtils();
-    static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] {
-            IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
+    static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+            new UTF8StringSerializerDeserializer() };
     static RecordDescriptor RecordDesc = new RecordDescriptor(SerDers);
     static Random GRandom = new Random(System.currentTimeMillis());
     static int[] SortFields = new int[] { 0, 1 };
@@ -63,17 +63,17 @@ public abstract class AbstractRunGeneratorTest {
             PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
             PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
 
-    static void assertMaxFrameSizesAreAllEqualsTo(List<RunAndMaxFrameSizePair> maxSize, int pageSize) {
+    static void assertMaxFrameSizesAreAllEqualsTo(List<GeneratedRunFileReader> maxSize, int pageSize) {
         for (int i = 0; i < maxSize.size(); i++) {
-            assertTrue(maxSize.get(i).maxFrameSize == pageSize);
+            assertTrue(maxSize.get(i).getMaxFrameSize() == pageSize);
         }
     }
 
     abstract AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
             throws HyracksDataException;
 
-    protected List<RunAndMaxFrameSizePair> testSortRecords(int pageSize, int frameLimit, int numRuns,
-            int minRecordSize, int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
+    protected List<GeneratedRunFileReader> testSortRecords(int pageSize, int frameLimit, int numRuns, int minRecordSize,
+            int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
         IHyracksTaskContext ctx = testUtils.create(pageSize);
 
         HashMap<Integer, String> keyValuePair = new HashMap<>();
@@ -90,12 +90,12 @@ public abstract class AbstractRunGeneratorTest {
         return runGenerator.getRuns();
     }
 
-    static void matchResult(IHyracksTaskContext ctx, List<RunAndMaxFrameSizePair> runs,
+    static void matchResult(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs,
             Map<Integer, String> keyValuePair) throws HyracksDataException {
         HashMap<Integer, String> copyMap2 = new HashMap<>(keyValuePair);
         int maxFrameSizes = 0;
-        for (RunAndMaxFrameSizePair run : runs) {
-            maxFrameSizes = Math.max(maxFrameSizes, run.maxFrameSize);
+        for (GeneratedRunFileReader run : runs) {
+            maxFrameSizes = Math.max(maxFrameSizes, run.getMaxFrameSize());
         }
         GroupVSizeFrame gframe = new GroupVSizeFrame(ctx, maxFrameSizes);
         GroupFrameAccessor gfta = new GroupFrameAccessor(ctx.getInitialFrameSize(), RecordDesc);
@@ -125,25 +125,25 @@ public abstract class AbstractRunGeneratorTest {
         return preKey;
     }
 
-    static void assertReadSorted(List<RunAndMaxFrameSizePair> runs, IFrameTupleAccessor fta, IFrame frame,
+    static void assertReadSorted(List<GeneratedRunFileReader> runs, IFrameTupleAccessor fta, IFrame frame,
             Map<Integer, String> keyValuePair) throws HyracksDataException {
 
         assertTrue(runs.size() > 0);
-        for (RunAndMaxFrameSizePair run : runs) {
-            run.run.open();
+        for (GeneratedRunFileReader run : runs) {
+            run.open();
             int preKey = Integer.MIN_VALUE;
-            while (run.run.nextFrame(frame)) {
+            while (run.nextFrame(frame)) {
                 fta.reset(frame.getBuffer());
                 preKey = assertFTADataIsSorted(fta, keyValuePair, preKey);
             }
-            run.run.close();
+            run.close();
         }
         assertTrue(keyValuePair.isEmpty());
     }
 
     static void prepareData(IHyracksTaskContext ctx, List<IFrame> frameList, int minDataSize, int minRecordSize,
             int maxRecordSize, Map<Integer, String> specialData, Map<Integer, String> keyValuePair)
-            throws HyracksDataException {
+                    throws HyracksDataException {
 
         ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
         FrameTupleAppender appender = new FrameTupleAppender();
@@ -223,7 +223,7 @@ public abstract class AbstractRunGeneratorTest {
         int numRuns = 2;
         int minRecordSize = pageSize / 8;
         int maxRecordSize = pageSize / 8;
-        List<RunAndMaxFrameSizePair> maxSize = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
+        List<GeneratedRunFileReader> maxSize = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
                 maxRecordSize, null);
         assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
     }
@@ -235,8 +235,8 @@ public abstract class AbstractRunGeneratorTest {
         int numRuns = 2;
         int minRecordSize = pageSize;
         int maxRecordSize = (int) (pageSize * 1.8);
-        List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
-                maxRecordSize, null);
+        List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+                null);
         assertMaxFrameSizesAreAllEqualsTo(size, pageSize * 2);
     }
 
@@ -248,12 +248,12 @@ public abstract class AbstractRunGeneratorTest {
         int minRecordSize = 20;
         int maxRecordSize = pageSize / 2;
         HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit - 1);
-        List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
-                maxRecordSize, specialPair);
+        List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+                specialPair);
 
         int max = 0;
-        for (RunAndMaxFrameSizePair run : size) {
-            max = Math.max(max, run.maxFrameSize);
+        for (GeneratedRunFileReader run : size) {
+            max = Math.max(max, run.getMaxFrameSize());
         }
         assertTrue(max == pageSize * (frameLimit - 1));
     }
@@ -266,8 +266,8 @@ public abstract class AbstractRunGeneratorTest {
         HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit);
         int minRecordSize = 10;
         int maxRecordSize = pageSize / 2;
-        List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
-                maxRecordSize, specialPair);
+        List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+                specialPair);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
new file mode 100644
index 0000000..f1a4231
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import org.apache.hyracks.dataflow.std.group.ISpillableTableFactory;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupBuildOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupWriteOperatorNodePushable;
+
+public class ExternalHashGroupbyTest extends AbstractExternalGroupbyTest {
+    ExternalGroupBuildOperatorNodePushable buildOperator;
+    ExternalGroupWriteOperatorNodePushable mergeOperator;
+
+    @Override
+    protected void initial(IHyracksTaskContext ctx, int tableSize, int numFrames) {
+        ISpillableTableFactory tableFactory = new HashSpillableTableFactory(
+                new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE });
+        buildOperator = new ExternalGroupBuildOperatorNodePushable(ctx, this.hashCode(), tableSize,
+                numFrames * ctx.getInitialFrameSize(), keyFields, numFrames, comparatorFactories,
+                normalizedKeyComputerFactory, partialAggrInPlace, inRecordDesc, outputRec, tableFactory);
+        mergeOperator = new ExternalGroupWriteOperatorNodePushable(ctx, this.hashCode(), tableFactory, outputRec,
+                outputRec, numFrames, keyFieldsAfterPartial, normalizedKeyComputerFactory, comparatorFactories,
+                finalAggrInPlace);
+    }
+
+    @Override
+    protected IFrameWriter getBuilder() {
+        return buildOperator;
+    }
+
+    @Override
+    protected AbstractUnaryOutputSourceOperatorNodePushable getMerger() {
+        return mergeOperator;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
index e6d10f2..567b7df 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
@@ -55,10 +55,9 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.dataflow.std.sort.Algorithm;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
 import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader;
 import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
 import org.junit.Test;
@@ -234,8 +233,8 @@ public class RunMergingFrameReaderTest {
             List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
             List<TestFrameReader> readerList = new ArrayList<>(numRuns);
             List<IFrame> frameList = new ArrayList<>(numRuns);
-            prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize,
-                    readerList, frameList, keyValueMapList);
+            prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
+                    frameList, keyValueMapList);
 
             RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields,
                     Comparators, null, RecordDesc, topK);
@@ -313,8 +312,8 @@ public class RunMergingFrameReaderTest {
         int maxRecordSize = pageSize / 2;
 
         IHyracksTaskContext ctx = testUtils.create(pageSize);
-        ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields, null,
-                ComparatorFactories, RecordDesc, Algorithm.MERGE_SORT, numFramesPerRun);
+        ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories,
+                RecordDesc, Algorithm.MERGE_SORT, numFramesPerRun);
 
         runGenerator.open();
         Map<Integer, String> keyValuePair = new HashMap<>();
@@ -336,20 +335,19 @@ public class RunMergingFrameReaderTest {
         }
         runGenerator.close();
         List<IFrame> inFrame = new ArrayList<>(runGenerator.getRuns().size());
-        for (RunAndMaxFrameSizePair max : runGenerator.getRuns()) {
-            inFrame.add(new GroupVSizeFrame(ctx, max.maxFrameSize));
+        for (GeneratedRunFileReader max : runGenerator.getRuns()) {
+            inFrame.add(new GroupVSizeFrame(ctx, max.getMaxFrameSize()));
         }
 
         // Let each run file reader not delete the run file when it is read and closed.
-        for (RunAndMaxFrameSizePair run : runGenerator.getRuns()) {
-            RunFileReader runFileReader = (RunFileReader) run.run;
-            PA.setValue(runFileReader, "deleteAfterClose", false);
+        for (GeneratedRunFileReader run : runGenerator.getRuns()) {
+            PA.setValue(run, "deleteAfterClose", false);
         }
         matchResult(ctx, runGenerator.getRuns(), keyValuePair);
 
         List<IFrameReader> runs = new ArrayList<>();
-        for (RunAndMaxFrameSizePair run : runGenerator.getRuns()) {
-            runs.add(run.run);
+        for (GeneratedRunFileReader run : runGenerator.getRuns()) {
+            runs.add(run);
         }
         RunMergingFrameReader reader = new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null,
                 RecordDesc);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
new file mode 100644
index 0000000..bcf661f
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.group.sort.ExternalSortGroupByRunGenerator;
+import org.apache.hyracks.dataflow.std.group.sort.ExternalSortGroupByRunMerger;
+import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
+import org.apache.hyracks.dataflow.std.sort.Algorithm;
+import org.apache.hyracks.dataflow.std.sort.ISorter;
+
+public class SortGroupbyTest extends AbstractExternalGroupbyTest {
+    ExternalSortGroupByRunGenerator builder;
+
+    IOperatorNodePushable mergerOperator;
+
+    @Override
+    protected void initial(final IHyracksTaskContext ctx, int tableSize, final int numFrames)
+            throws HyracksDataException {
+        builder = new ExternalSortGroupByRunGenerator(ctx, keyFields, inRecordDesc, numFrames, keyFields,
+                normalizedKeyComputerFactory, comparatorFactories, partialAggrInState, outputRec, Algorithm.QUICK_SORT);
+
+        mergerOperator = new AbstractUnaryOutputSourceOperatorNodePushable() {
+            @Override
+            public void initialize() throws HyracksDataException {
+                List<GeneratedRunFileReader> runs = builder.getRuns();
+                ISorter sorter = builder.getSorter();
+                IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+                for (int i = 0; i < comparatorFactories.length; ++i) {
+                    comparators[i] = comparatorFactories[i].createBinaryComparator();
+                }
+                INormalizedKeyComputer nmkComputer = normalizedKeyComputerFactory == null ? null
+                        : normalizedKeyComputerFactory.createNormalizedKeyComputer();
+                AbstractExternalSortRunMerger merger = new ExternalSortGroupByRunMerger(ctx, sorter, runs, keyFields,
+                        inRecordDesc, outputRec, outputRec, numFrames, writer, keyFields, nmkComputer, comparators,
+                        partialAggrInState, finalAggrInState, true);
+                merger.process();
+            }
+        };
+    }
+
+    @Override
+    protected IFrameWriter getBuilder() {
+        return builder;
+    }
+
+    @Override
+    protected IOperatorNodePushable getMerger() {
+        return mergerOperator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/text-example/textclient/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/text-example/textclient/pom.xml b/hyracks/hyracks-examples/text-example/textclient/pom.xml
index 3bf69e2..a88c421 100644
--- a/hyracks/hyracks-examples/text-example/textclient/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textclient/pom.xml
@@ -67,24 +67,7 @@
               <goal>assemble</goal>
             </goals>
           </execution>
-          <execution>
-          	<id>groupclient</id>
-            <configuration>
-              <programs>
-                <program>
-                  <mainClass>org.apache.hyracks.examples.text.client.ExternalGroupClient</mainClass>
-                  <name>groupclient</name>
-                </program>
-              </programs>
-              <repositoryLayout>flat</repositoryLayout>
-              <repositoryName>lib</repositoryName>
-            </configuration>
-            <phase>package</phase>
-            <goals>
-              <goal>assemble</goal>
-            </goals>
-          </execution>
-        </executions>
+       </executions>
       </plugin>
       <plugin>
         <artifactId>maven-assembly-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java
deleted file mode 100644
index 1ae3258..0000000
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.examples.text.client;
-
-import java.io.File;
-
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-
-/**
- * The application client for the performance tests of the external hash group
- * operator.
- */
-public class ExternalGroupClient {
-    private static class Options {
-        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
-        public String host;
-
-        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
-        public int port = 1098;
-
-        @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
-        public String inFileSplits;
-
-        @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
-        public String outFileSplits;
-
-        @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
-        public int htSize = 8191;
-
-        @Option(name = "-frame-size", usage = "Frame size (default: 32768)", required = false)
-        public int frameSize = 32768;
-
-        @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 512)", required = false)
-        public int sbSize = 512;
-
-        @Option(name = "-sort-output", usage = "Whether to sort the output (default: true)", required = false)
-        public boolean sortOutput = false;
-
-        @Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
-        public boolean outPlain = true;
-
-        @Option(name = "-algo", usage = "The algorithm to be used", required = true)
-        public int algo;
-    }
-
-    /**
-     * @param args
-     */
-    public static void main(String[] args) throws Exception {
-        Options options = new Options();
-        CmdLineParser parser = new CmdLineParser(options);
-        parser.parseArgument(args);
-
-        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
-
-        JobSpecification job;
-
-        for (int i = 0; i < 6; i++) {
-            long start = System.currentTimeMillis();
-            job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i),
-                    options.htSize, options.sbSize, options.frameSize, options.sortOutput, options.algo,
-                    options.outPlain);
-
-            System.out.print(i + "\t" + (System.currentTimeMillis() - start));
-            start = System.currentTimeMillis();
-            JobId jobId = hcc.startJob(job);
-            hcc.waitForCompletion(jobId);
-            System.out.println("\t" + (System.currentTimeMillis() - start));
-        }
-    }
-
-    private static FileSplit[] parseFileSplits(String fileSplits) {
-        String[] splits = fileSplits.split(",");
-        FileSplit[] fSplits = new FileSplit[splits.length];
-        for (int i = 0; i < splits.length; ++i) {
-            String s = splits[i].trim();
-            int idx = s.indexOf(':');
-            if (idx < 0) {
-                throw new IllegalArgumentException("File split " + s + " not well formed");
-            }
-            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
-        }
-        return fSplits;
-    }
-
-    private static FileSplit[] parseFileSplits(String fileSplits, int count) {
-        String[] splits = fileSplits.split(",");
-        FileSplit[] fSplits = new FileSplit[splits.length];
-        for (int i = 0; i < splits.length; ++i) {
-            String s = splits[i].trim();
-            int idx = s.indexOf(':');
-            if (idx < 0) {
-                throw new IllegalArgumentException("File split " + s + " not well formed");
-            }
-            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1) + "_"
-                    + count)));
-        }
-        return fSplits;
-    }
-
-    private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, int sbSize,
-            int frameSize, boolean sortOutput, int alg, boolean outPlain) {
-        JobSpecification spec = new JobSpecification(frameSize);
-        IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
-
-        RecordDescriptor inDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-                FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
-        FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
-                        IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
-                        IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-                        FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, }, '|'), inDesc);
-
-        createPartitionConstraint(spec, fileScanner, inSplits);
-
-        // Output: each unique string with an integer count
-        RecordDescriptor outDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                IntegerSerializerDeserializer.INSTANCE,
-                // IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE });
-
-        // Specify the grouping key, which will be the string extracted during
-        // the scan.
-        int[] keys = new int[] { 0,
-        // 1
-        };
-
-        AbstractOperatorDescriptor grouper;
-
-        switch (alg) {
-            case 0: // new external hash graph
-                grouper = new org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
-                        keys, frameSize, new IBinaryComparatorFactory[] {
-                        // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
-                        new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
-                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
-                        new MultiFieldsAggregatorFactory(
-                                new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(keys.length,
-                                        false) }), outDesc, new HashSpillableTableFactory(
-                                new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                                // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
-                                PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }), htSize), false);
-
-                createPartitionConstraint(spec, grouper, outSplits);
-
-                // Connect scanner with the grouper
-                IConnectorDescriptor scanGroupConnDef2 = new MToNPartitioningConnectorDescriptor(spec,
-                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
-                spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
-
-                break;
-            case 1: // External-sort + new-precluster
-                ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, frameSize, keys,
-                        new IBinaryComparatorFactory[] {
-                        // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, inDesc);
-                createPartitionConstraint(spec, sorter2, inSplits);
-
-                // Connect scan operator with the sorter
-                IConnectorDescriptor scanSortConn2 = new MToNPartitioningConnectorDescriptor(spec,
-                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
-                spec.connect(scanSortConn2, fileScanner, 0, sorter2, 0);
-
-                grouper = new org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor(
-                        spec, keys, new IBinaryComparatorFactory[] {
-                        // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
-                        new MultiFieldsAggregatorFactory(
-                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                        outDesc);
-
-                createPartitionConstraint(spec, grouper, outSplits);
-
-                // Connect sorter with the pre-cluster
-                OneToOneConnectorDescriptor sortGroupConn2 = new OneToOneConnectorDescriptor(spec);
-                spec.connect(sortGroupConn2, sorter2, 0, grouper, 0);
-                break;
-            case 2: // Inmem
-                grouper = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
-                        new IBinaryHashFunctionFactory[] {
-                        // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }),
-                        new IBinaryComparatorFactory[] {
-                        // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
-                        new MultiFieldsAggregatorFactory(
-                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                        outDesc, htSize);
-
-                createPartitionConstraint(spec, grouper, outSplits);
-
-                // Connect scanner with the grouper
-                IConnectorDescriptor scanConn2 = new MToNPartitioningConnectorDescriptor(spec,
-                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
-                spec.connect(scanConn2, fileScanner, 0, grouper, 0);
-                break;
-            default:
-                grouper = new org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
-                        keys, frameSize, new IBinaryComparatorFactory[] {
-                        // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
-                        new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
-                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
-                        new MultiFieldsAggregatorFactory(
-                                new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(keys.length,
-                                        false) }), outDesc, new HashSpillableTableFactory(
-                                new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                                // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
-                                PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }), htSize), false);
-
-                createPartitionConstraint(spec, grouper, outSplits);
-
-                // Connect scanner with the grouper
-                IConnectorDescriptor scanGroupConnDef = new MToNPartitioningConnectorDescriptor(spec,
-                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
-                spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
-        }
-
-        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
-
-        AbstractSingleActivityOperatorDescriptor writer;
-
-        if (outPlain)
-            writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
-        else
-            writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
-
-        createPartitionConstraint(spec, writer, outSplits);
-
-        IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(groupOutConn, grouper, 0, writer, 0);
-
-        spec.addRoot(writer);
-        return spec;
-    }
-
-    private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
-        String[] parts = new String[splits.length];
-        for (int i = 0; i < splits.length; ++i) {
-            parts[i] = splits[i].getNodeName();
-        }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
-    }
-}


Mime
View raw message