asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [01/11] incubator-asterixdb-hyracks git commit: Implemented the memory-bounded HashGroupby and HashJoin for BigObject
Date Fri, 26 Feb 2016 05:53:58 GMT
Repository: incubator-asterixdb-hyracks
Updated Branches:
  refs/heads/master c20aa64af -> 6abc63e23


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index b9cbb9d..22a571f 100644
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -21,9 +21,6 @@ package org.apache.hyracks.examples.text.client;
 import java.io.File;
 import java.util.EnumSet;
 
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
@@ -31,6 +28,7 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.io.FileReference;
@@ -39,6 +37,7 @@ import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
@@ -52,14 +51,19 @@ import org.apache.hyracks.dataflow.std.file.FileSplit;
 import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
 import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
 import org.apache.hyracks.examples.text.WordTupleParserFactory;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
 
 public class WordCountMain {
     private static class Options {
@@ -84,8 +88,8 @@ public class WordCountMain {
         @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
         public int htSize = 8191;
 
-        @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
-        public int sbSize = 32768;
+        @Option(name = "-frame-limit", usage = "Memory limit in frames (default:4)", required = false)
+        public int memFrameLimit = 10;
 
         @Option(name = "-runtime-profiling", usage = "Indicates if runtime profiling should be enabled. (default: false)")
         public boolean runtimeProfiling = false;
@@ -94,6 +98,8 @@ public class WordCountMain {
         public int frameSize = 32768;
     }
 
+    private static long fileSize = 0;
+
     public static void main(String[] args) throws Exception {
         Options options = new Options();
         CmdLineParser parser = new CmdLineParser(options);
@@ -102,7 +108,7 @@ public class WordCountMain {
         IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
-                options.algo, options.htSize, options.sbSize, options.format, options.frameSize);
+                options.algo, options.htSize, options.memFrameLimit, options.format, options.frameSize);
 
         long start = System.currentTimeMillis();
         JobId jobId = hcc.startJob(job,
@@ -121,13 +127,15 @@ public class WordCountMain {
             if (idx < 0) {
                 throw new IllegalArgumentException("File split " + s + " not well formed");
             }
-            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+            File file = new File(s.substring(idx + 1));
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(file));
+            fileSize += file.length();
         }
         return fSplits;
     }
 
     private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, String algo, int htSize,
-            int sbSize, String format, int frameSize) {
+            int frameLimit, String format, int frameSize) {
         JobSpecification spec = new JobSpecification(frameSize);
 
         IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
@@ -144,40 +152,39 @@ public class WordCountMain {
         IOperatorDescriptor gBy;
         int[] keys = new int[] { 0 };
         if ("hash".equalsIgnoreCase(algo)) {
-            gBy = new HashGroupOperatorDescriptor(
-                    spec,
-                    keys,
-                    new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                    .of(UTF8StringPointable.FACTORY) }),
+            gBy = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, frameLimit,
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    new MultiFieldsAggregatorFactory(
-                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                    groupResultDesc, htSize);
+                    new UTF8StringNormalizedKeyComputerFactory(),
+                    new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                            new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
+                            new FloatSumFieldAggregatorFactory(5, false) }),
+                    new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                            new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
+                            new FloatSumFieldAggregatorFactory(3, false) }),
+                    groupResultDesc, groupResultDesc, new HashSpillableTableFactory(
+                            new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
+
             createPartitionConstraint(spec, gBy, outSplits);
             IConnectorDescriptor scanGroupConn = new MToNPartitioningConnectorDescriptor(spec,
-                    new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                    .of(UTF8StringPointable.FACTORY) }));
+                    new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                            PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
             spec.connect(scanGroupConn, wordScanner, 0, gBy, 0);
         } else {
-            IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-                    .of(UTF8StringPointable.FACTORY) };
-            IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo) ? new InMemorySortOperatorDescriptor(spec,
-                    keys, new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc)
-                    : new ExternalSortOperatorDescriptor(spec, sbSize, keys,
+            IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] {
+                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+            IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo)
+                    ? new InMemorySortOperatorDescriptor(spec, keys, new UTF8StringNormalizedKeyComputerFactory(), cfs,
+                            wordDesc)
+                    : new ExternalSortOperatorDescriptor(spec, frameLimit, keys,
                             new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc);
             createPartitionConstraint(spec, sorter, outSplits);
 
             IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(spec,
-                    new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                    .of(UTF8StringPointable.FACTORY) }));
+                    new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                            PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
             spec.connect(scanSortConn, wordScanner, 0, sorter, 0);
 
-            gBy = new PreclusteredGroupOperatorDescriptor(
-                    spec,
-                    keys,
+            gBy = new PreclusteredGroupOperatorDescriptor(spec, keys,
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     new MultiFieldsAggregatorFactory(
                             new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
@@ -188,8 +195,9 @@ public class WordCountMain {
         }
 
         IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
-        IOperatorDescriptor writer = "text".equalsIgnoreCase(format) ? new PlainFileWriterOperatorDescriptor(spec,
-                outSplitProvider, ",") : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+        IOperatorDescriptor writer = "text".equalsIgnoreCase(format)
+                ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",")
+                : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
         createPartitionConstraint(spec, writer, outSplits);
 
         IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
index cd45536..a52d21e 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -27,44 +27,79 @@
     <version>0.2.17-SNAPSHOT</version>
   </parent>
 
-  <dependencies>
-  	<dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  		<scope>compile</scope>
-  	</dependency>
-  	<dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-data-std</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  	</dependency>
-  </dependencies>
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>appassembler-maven-plugin</artifactId>
-        <version>1.3</version>
-        <executions>
-          <execution>
-            <configuration>
-              <programs>
-                <program>
-                  <mainClass>org.apache.hyracks.examples.tpch.client.Sort</mainClass>
-                  <name>tpchclient</name>
-                </program>
-              </programs>
-              <repositoryLayout>flat</repositoryLayout>
-              <repositoryName>lib</repositoryName>
-            </configuration>
-            <phase>package</phase>
-            <goals>
-              <goal>assemble</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-dataflow-std</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-data-std</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>appassembler-maven-plugin</artifactId>
+                <version>1.3</version>
+                <executions>
+                    <execution>
+                        <id>sort</id>
+                        <configuration>
+                            <programs>
+                                <program>
+                                    <mainClass>org.apache.hyracks.examples.tpch.client.Sort</mainClass>
+                                    <name>sort</name>
+                                </program>
+                            </programs>
+                            <repositoryLayout>flat</repositoryLayout>
+                            <repositoryName>lib</repositoryName>
+                        </configuration>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>assemble</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>group</id>
+                        <configuration>
+                            <programs>
+                                <program>
+                                    <mainClass>org.apache.hyracks.examples.tpch.client.Groupby</mainClass>
+                                    <name>group</name>
+                                </program>
+                            </programs>
+                            <repositoryLayout>flat</repositoryLayout>
+                            <repositoryName>lib</repositoryName>
+                        </configuration>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>assemble</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>join</id>
+                        <configuration>
+                            <programs>
+                                <program>
+                                    <mainClass>org.apache.hyracks.examples.tpch.client.Join</mainClass>
+                                    <name>join</name>
+                                </program>
+                            </programs>
+                            <repositoryLayout>flat</repositoryLayout>
+                            <repositoryName>lib</repositoryName>
+                        </configuration>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>assemble</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Common.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Common.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Common.java
index ac172fd..15d7f66 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Common.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Common.java
@@ -27,8 +27,12 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 
@@ -56,12 +60,36 @@ public class Common {
             new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
             new UTF8StringSerializerDeserializer() });
 
-    static IValueParserFactory[] orderParserFactories = new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE };
+    static RecordDescriptor lineitemDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
+    static IValueParserFactory[] lineitemParserFactories = new IValueParserFactory[] {
+            IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+            IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+            IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+            FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, };
+
+    static IValueParserFactory[] custParserFactories = new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE };
+    static IValueParserFactory[] orderParserFactories = new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE };
 
     static FileSplit[] parseFileSplits(String fileSplits) {
         String[] splits = fileSplits.split(",");
@@ -77,6 +105,21 @@ public class Common {
         return fSplits;
     }
 
+    static FileSplit[] parseFileSplits(String fileSplits, int count) {
+        String[] splits = fileSplits.split(",");
+        FileSplit[] fSplits = new FileSplit[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            String s = splits[i].trim();
+            int idx = s.indexOf(':');
+            if (idx < 0) {
+                throw new IllegalArgumentException("File split " + s + " not well formed");
+            }
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1) + "_"
+                    + count)));
+        }
+        return fSplits;
+    }
+
     static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
         String[] parts = new String[splits.length];
         for (int i = 0; i < splits.length; ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
new file mode 100644
index 0000000..2ef5835
--- /dev/null
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.examples.tpch.client;
+
+import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint;
+import static org.apache.hyracks.examples.tpch.client.Common.lineitemDesc;
+import static org.apache.hyracks.examples.tpch.client.Common.lineitemParserFactories;
+import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
+
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+/**
+ * The application client for the performance tests of the groupby
+ * operator.
+ */
+public class Groupby {
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
+
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
+        public int port = 1098;
+
+        @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
+        public String inFileSplits;
+
+        @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+        public String outFileSplits;
+
+        @Option(name = "-input-tuples", usage = "Hash table size ", required = true)
+        public int htSize;
+
+        @Option(name = "-input-size", usage = "Physical file size in bytes ", required = true)
+        public long fileSize;
+
+        @Option(name = "-frame-size", usage = "Frame size (default: 32768)", required = false)
+        public int frameSize = 32768;
+
+        @Option(name = "-frame-limit", usage = "memory limit for sorting (default: 4)", required = false)
+        public int frameLimit = 4;
+
+        @Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
+        public boolean outPlain = true;
+
+        @Option(name = "-algo", usage = "The algorithm to be used: hash|sort", required = true)
+        public String algo = "hash";
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        if (args.length == 0) {
+            parser.printUsage(System.err);
+            return;
+        }
+        parser.parseArgument(args);
+
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
+
+        JobSpecification job;
+
+        long start = System.currentTimeMillis();
+        job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits), options.htSize,
+                options.fileSize, options.frameLimit, options.frameSize, options.algo, options.outPlain);
+        if (job != null) {
+            System.out.print("CreateJobTime:" + (System.currentTimeMillis() - start));
+            start = System.currentTimeMillis();
+            JobId jobId = hcc.startJob(job);
+            hcc.waitForCompletion(jobId);
+            System.out.println("JobExecuteTime:" + (System.currentTimeMillis() - start));
+        }
+    }
+
+    private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, long fileSize,
+            int frameLimit, int frameSize, String alg, boolean outPlain) {
+        JobSpecification spec = new JobSpecification(frameSize);
+        IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
+
+        FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
+                new DelimitedDataTupleParserFactory(lineitemParserFactories, '|'), lineitemDesc);
+
+        createPartitionConstraint(spec, fileScanner, inSplits);
+
+        // Output: each unique string with an integer count
+        RecordDescriptor outDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+                        // IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
+
+        // Specify the grouping key, which will be the string extracted during
+        // the scan.
+        int[] keys = new int[] { 0,
+                // 1
+        };
+
+        AbstractOperatorDescriptor grouper;
+
+        if (alg.equalsIgnoreCase("hash")) {// external hash graph
+            grouper = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, frameLimit,
+                    new IBinaryComparatorFactory[] {
+                            // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+                            PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+                    new IntegerNormalizedKeyComputerFactory(),
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
+                    new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                            new IntSumFieldAggregatorFactory(keys.length, false) }),
+                    outDesc, outDesc, new HashSpillableTableFactory(
+                            new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }));
+
+            createPartitionConstraint(spec, grouper, outSplits);
+        } else if (alg.equalsIgnoreCase("sort")) {
+            grouper = new SortGroupByOperatorDescriptor(spec, frameLimit, keys, keys,
+                    new IntegerNormalizedKeyComputerFactory(),
+                    new IBinaryComparatorFactory[] {
+                            // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+                            PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                    new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                            new IntSumFieldAggregatorFactory(keys.length, true) }),
+                    outDesc, outDesc, false);
+
+            createPartitionConstraint(spec, grouper, outSplits);
+        } else {
+            System.err.println("unknow groupby alg:" + alg);
+            return null;
+        }
+        // Connect scanner with the grouper
+        IConnectorDescriptor scanGroupConnDef2 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keys,
+                        new IBinaryHashFunctionFactory[] {
+                                // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
+        spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
+
+        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
+
+        AbstractSingleActivityOperatorDescriptor writer;
+
+        if (outPlain)
+            writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
+        else
+            writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+
+        createPartitionConstraint(spec, writer, outSplits);
+
+        IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(groupOutConn, grouper, 0, writer, 0);
+
+        spec.addRoot(writer);
+        return spec;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index fbfc5f5..507e1c7 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -18,26 +18,22 @@
  */
 package org.apache.hyracks.examples.tpch.client;
 
-import static org.apache.hyracks.examples.tpch.client.Common.*;
+import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint;
+import static org.apache.hyracks.examples.tpch.client.Common.custParserFactories;
+import static org.apache.hyracks.examples.tpch.client.Common.orderParserFactories;
+import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
 
 import java.util.EnumSet;
 
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
@@ -45,11 +41,11 @@ import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -57,16 +53,21 @@ import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
 import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
 import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
 import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
 
 public class Join {
     private static class Options {
@@ -91,23 +92,23 @@ public class Join {
         @Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
         public boolean profile = true;
 
-        @Option(name = "-table-size", usage = "Table size for in-memory hash join", required = false)
+        @Option(name = "-table-size", usage = "Table size for in-memory hash join. (default: 8191)", required = false)
         public int tableSize = 8191;
 
-        @Option(name = "-algo", usage = "Join types", required = true)
+        @Option(name = "-algo", usage = "Join types:InMem|NestedLoop|Hybrid|Grace", required = true)
         public String algo;
 
         // For grace/hybrid hash join only
         @Option(name = "-mem-size", usage = "Memory size for hash join", required = true)
         public int memSize;
 
-        @Option(name = "-input-size", usage = "Input size of the grace/hybrid hash join", required = false)
-        public int graceInputSize = 10;
+        @Option(name = "-input-size", usage = "Input size of the hybrid hash join", required = false)
+        public int graceInputSize = 100000;
 
-        @Option(name = "-records-per-frame", usage = "Records per frame for grace/hybrid hash join", required = false)
+        @Option(name = "-records-per-frame", usage = "Records per frame for hybrid hash join", required = false)
         public int graceRecordsPerFrame = 200;
 
-        @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join", required = false)
+        @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join, (default:1.2)", required = false)
         public double graceFactor = 1.2;
 
         // Whether group-by is processed after the join
@@ -121,6 +122,10 @@ public class Join {
     public static void main(String[] args) throws Exception {
         Options options = new Options();
         CmdLineParser parser = new CmdLineParser(options);
+        if (args.length == 0) {
+            parser.printUsage(System.err);
+            return;
+        }
         parser.parseArgument(args);
 
         IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
@@ -129,6 +134,9 @@ public class Join {
                 parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
                 options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
                 options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy, options.frameSize);
+        if (job == null) {
+            return;
+        }
 
         long start = System.currentTimeMillis();
         JobId jobId = hcc.startJob(job,
@@ -141,87 +149,76 @@ public class Join {
     private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
             FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
             double graceFactor, int memSize, int tableSize, boolean hasGroupBy, int frameSize)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         JobSpecification spec = new JobSpecification(frameSize);
 
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
+        long custFileSize = 0;
+        for (int i = 0; i < customerSplits.length; i++) {
+            custFileSize += customerSplits[i].getLocalFile().getFile().length();
+        }
 
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
+        long orderFileSize = 0;
+        for (int i = 0; i < orderSplits.length; i++) {
+            orderFileSize += orderSplits[i].getLocalFile().getFile().length();
+        }
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), Common.ordersDesc);
+                new DelimitedDataTupleParserFactory(orderParserFactories, '|'), Common.ordersDesc);
         createPartitionConstraint(spec, ordScanner, orderSplits);
 
         FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), Common.custDesc);
+                new DelimitedDataTupleParserFactory(custParserFactories, '|'), Common.custDesc);
         createPartitionConstraint(spec, custScanner, customerSplits);
 
         IOperatorDescriptor join;
 
         if ("nestedloop".equalsIgnoreCase(algo)) {
-            join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
-                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), Common.custOrderJoinDesc,
-                    memSize, false, null);
-
-        } else if ("gracehash".equalsIgnoreCase(algo)) {
-            join = new GraceHashJoinOperatorDescriptor(
-                    spec,
-                    memSize,
-                    graceInputSize,
-                    graceRecordsPerFrame,
-                    graceFactor,
-                    new int[] { 0 },
-                    new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                            .of(UTF8StringPointable.FACTORY) },
+            join = new NestedLoopJoinOperatorDescriptor(spec,
+                    new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+                    Common.custOrderJoinDesc, memSize, false, null);
+
+        } else if ("inmem".equalsIgnoreCase(algo)) {
+            join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 }, new int[] { 1 },
+                    new IBinaryHashFunctionFactory[] {
+                            PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    Common.custOrderJoinDesc, null);
+                    Common.custOrderJoinDesc, tableSize, null);
 
-        } else if ("hybridhash".equalsIgnoreCase(algo)) {
-            join = new HybridHashJoinOperatorDescriptor(
-                    spec,
-                    memSize,
-                    graceInputSize,
-                    graceRecordsPerFrame,
-                    graceFactor,
-                    new int[] { 0 },
-                    new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                            .of(UTF8StringPointable.FACTORY) },
+        } else if ("hybrid".equalsIgnoreCase(algo)) {
+            join = new OptimizedHybridHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceFactor,
+                    new int[] { 0 }, new int[] { 1 },
+                    new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                    Common.custOrderJoinDesc,
+                    new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+                    new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+                    null);
+
+        } else if ("grace".equalsIgnoreCase(algo)) {
+            join = new GraceHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceRecordsPerFrame, graceFactor,
+                    new int[] { 0 }, new int[] { 1 },
+                    new IBinaryHashFunctionFactory[] {
+                            PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     Common.custOrderJoinDesc, null);
 
         } else {
-            join = new InMemoryHashJoinOperatorDescriptor(
-                    spec,
-                    new int[] { 0 },
-                    new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                            .of(UTF8StringPointable.FACTORY) },
-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    Common.custOrderJoinDesc, 6000000, null);
+            System.err.println("unknown algorithm:" + algo);
+            return null;
         }
 
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
 
         IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
+                new FieldHashPartitionComputerFactory(new int[] { 1 }, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 1);
 
         IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
+                new FieldHashPartitionComputerFactory(new int[] { 0 }, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 0);
 
         IOperatorDescriptor endingOp = join;
@@ -231,29 +228,33 @@ public class Join {
             RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                     new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE });
 
-            HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
-                    spec,
-                    new int[] { 6 },
-                    new FieldHashPartitionComputerFactory(new int[] { 6 },
-                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                    .of(UTF8StringPointable.FACTORY) }),
+            ExternalGroupOperatorDescriptor gby = new ExternalGroupOperatorDescriptor(spec, tableSize,
+                    custFileSize + orderFileSize, new int[] { 6 }, memSize,
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    new MultiFieldsAggregatorFactory(
-                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                    groupResultDesc, 16);
+                    new UTF8StringNormalizedKeyComputerFactory(),
+                    new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                            new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
+                            new FloatSumFieldAggregatorFactory(5, false) }),
+                    new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                            new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
+                            new FloatSumFieldAggregatorFactory(3, false) }),
+                    groupResultDesc, groupResultDesc, new HashSpillableTableFactory(
+                            new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
+
             createPartitionConstraint(spec, gby, resultSplits);
 
             IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
-                    new FieldHashPartitionComputerFactory(new int[] { 6 },
-                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                    .of(UTF8StringPointable.FACTORY) }));
+                    new FieldHashPartitionComputerFactory(new int[] { 6 }, new IBinaryHashFunctionFactory[] {
+                            PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
             spec.connect(joinGroupConn, join, 0, gby, 0);
 
             endingOp = gby;
         }
 
         IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
-        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+        //FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+        IOperatorDescriptor writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
+
         createPartitionConstraint(spec, writer, resultSplits);
 
         IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(spec);
@@ -262,63 +263,4 @@ public class Join {
         spec.addRoot(writer);
         return spec;
     }
-
-
-
-    static class JoinComparatorFactory implements ITuplePairComparatorFactory {
-        private static final long serialVersionUID = 1L;
-
-        private final IBinaryComparatorFactory bFactory;
-        private final int pos0;
-        private final int pos1;
-
-        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
-            this.bFactory = bFactory;
-            this.pos0 = pos0;
-            this.pos1 = pos1;
-        }
-
-        @Override
-        public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
-            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
-        }
-    }
-
-    static class JoinComparator implements ITuplePairComparator {
-
-        private final IBinaryComparator bComparator;
-        private final int field0;
-        private final int field1;
-
-        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
-            this.bComparator = bComparator;
-            this.field0 = field0;
-            this.field1 = field1;
-        }
-
-        @Override
-        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
-                throws HyracksDataException {
-            int tStart0 = accessor0.getTupleStartOffset(tIndex0);
-            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
-
-            int tStart1 = accessor1.getTupleStartOffset(tIndex1);
-            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
-
-            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
-            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
-            int fLen0 = fEnd0 - fStart0;
-
-            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
-            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
-            int fLen1 = fEnd1 - fStart1;
-
-            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
-                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
-            if (c != 0) {
-                return c;
-            }
-            return 0;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
index f2dd519..28e6dd9 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
@@ -42,6 +42,7 @@ import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
@@ -54,7 +55,6 @@ import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.Algorithm;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.buffermanager.EnumFreeSlotPolicy;
 
 public class Sort {
     private static class Options {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
index 57e524a..5a7a4a7 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
@@ -1,8 +1,3 @@
-package org.apache.hyracks.storage.am.common.api;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -21,6 +16,12 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
  * specific language governing permissions and limitations
  * under the License.
  */
+
+package org.apache.hyracks.storage.am.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
 public interface ITwoPCIndexBulkLoader {
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index 1b1bee0..170950e 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -81,6 +81,7 @@ public class TreeIndexDiskOrderScanOperatorNodePushable extends AbstractUnaryOut
 
                     FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
                             tb.getSize());
+
                 }
                 appender.write(writer, true);
             } catch (Throwable th) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index b677ee1..f796812 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -19,6 +19,8 @@
 package org.apache.hyracks.test.support;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -40,6 +42,8 @@ public class TestTaskContext implements IHyracksTaskContext {
     private final TaskAttemptId taskId;
     private WorkspaceFileFactory fileFactory;
 
+    private Map<Object, IStateObject> stateObjectMap = new HashMap<>();
+
     public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) {
         this.jobletContext = jobletContext;
         this.taskId = taskId;
@@ -114,13 +118,13 @@ public class TestTaskContext implements IHyracksTaskContext {
     }
 
     @Override
-    public void setStateObject(IStateObject taskState) {
-
+    public synchronized void setStateObject(IStateObject taskState) {
+        stateObjectMap.put(taskState.getId(), taskState);
     }
 
     @Override
-    public IStateObject getStateObject(Object id) {
-        return null;
+    public synchronized IStateObject getStateObject(Object id) {
+        return stateObjectMap.get(id);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 90f80a1..fe44a12 100644
--- a/pom.xml
+++ b/pom.xml
@@ -171,6 +171,7 @@
            <exclude>**/ClusterControllerService/**</exclude>
            <exclude>**/target/**</exclude>
            <exclude>**/output/**</exclude>
+           <exclude>**/target/**</exclude>
            <exclude>**/*.iml</exclude>
            <exclude>**/*.prefs</exclude>
            <exclude>**/.classpath</exclude>


Mime
View raw message