asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wangs...@apache.org
Subject [3/3] asterixdb git commit: ASTERIXDB-1556, ASTERIXDB-1733: Hash Group By and Hash Join conform to the memory budget
Date Wed, 04 Jan 2017 23:46:31 GMT
ASTERIXDB-1556, ASTERIXDB-1733: Hash Group By and Hash Join conform to the memory budget

 - External Hash Group By and Hash Join now conform to the memory budget (compiler.groupmemory and compiler.joinmemory)
 - For Optimzed Hybrid Hash Join, we calculate the expected hash table size when the build phase is done and
   try to spill one or more partitions if the freespace can't afford the hash table size.
 - For External Hash Group By, the number of hash entries (hash table size) is calculated based on
   an estimation of the aggregated tuple size and possible hash values for the given field size in that tuple.
 - Garbage Collection feature has been added to SerializableHashTable. For external hash group-by,
   whenever we spill a data partition to the disk, we also check the ratio of garbage in the hash table.
   If it's greater than the given threshold, we conduct a GC on Hash Table.

Change-Id: I2b323e9a2141b4c1dd1652a360d2d9354d3bc3f5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1056
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/8b2aceeb
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/8b2aceeb
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/8b2aceeb

Branch: refs/heads/master
Commit: 8b2aceeb97c8f89f2898c0b35f38cc36d3cdda63
Parents: 1355c26
Author: Taewoo Kim <wangsaeu@yahoo.com>
Authored: Wed Jan 4 12:44:02 2017 -0800
Committer: Taewoo Kim <wangsaeu@yahoo.com>
Committed: Wed Jan 4 15:45:58 2017 -0800

----------------------------------------------------------------------
 .../rules/SetAsterixPhysicalOperatorsRule.java  |   1 -
 .../resources/asterix-build-configuration.xml   |   2 +-
 .../resources/asterix-build-configuration2.xml  |   2 +-
 .../resources/asterix-build-configuration3.xml  |   2 +-
 .../api/cluster_state_1/cluster_state_1.1.adm   |   2 +-
 .../cluster_state_1_full.1.adm                  |   2 +-
 .../cluster_state_1_less.1.adm                  |   2 +-
 .../algebricks/algebricks-core/pom.xml          |  11 +
 .../physical/ExternalGroupByPOperator.java      |  66 +-
 .../physical/HybridHashJoinPOperator.java       |   1 +
 .../physical/InMemoryHashJoinPOperator.java     |  10 +-
 .../physical/ExternalGroupByPOperatorTest.java  | 128 ++++
 .../SetAlgebricksPhysicalOperatorsRule.java     |   1 -
 .../algebricks/rewriter/util/JoinUtils.java     |   6 +-
 .../api/comm/IFrameTupleReversibleAppender.java |  34 +
 .../comm/io/FixedSizeFrameTupleAppender.java    |  30 +-
 .../dataflow/common/io/RunFileReader.java       |   4 +
 .../std/buffermanager/FrameBufferManager.java   |  62 ++
 .../FramePoolBackedFrameBufferManager.java      |  48 ++
 .../IPartitionedTupleBufferManager.java         |  13 +-
 .../ISimpleFrameBufferManager.java              |  42 ++
 .../PreferToSpillFullyOccupiedFramePolicy.java  |  40 +-
 .../buffermanager/TupleInFrameListAccessor.java |  53 ++
 .../VPartitionTupleBufferManager.java           |  73 +-
 .../std/group/HashSpillableTableFactory.java    |  99 ++-
 .../std/group/ISpillableTableFactory.java       |   4 +-
 .../ExternalGroupBuildOperatorNodePushable.java |   2 +-
 .../ExternalGroupOperatorDescriptor.java        |   7 +-
 .../join/HybridHashJoinOperatorDescriptor.java  |  15 +-
 .../dataflow/std/join/InMemoryHashJoin.java     |  72 +-
 .../InMemoryHashJoinOperatorDescriptor.java     |  53 +-
 .../std/join/OptimizedHybridHashJoin.java       | 284 ++++++--
 ...timizedHybridHashJoinOperatorDescriptor.java | 158 ++--
 .../std/structures/ISerializableTable.java      |  31 +-
 .../std/structures/SerializableHashTable.java   | 723 +++++++++++++------
 .../structures/SimpleSerializableHashTable.java | 542 ++++++++++++++
 .../structures/SerializableHashTableTest.java   |  22 +-
 .../SimpleSerializableHashTableTest.java        |  98 +++
 .../tests/integration/AggregationTest.java      |  12 +-
 .../TPCHCustomerOrderHashJoinTest.java          |  12 +-
 .../tests/unit/AbstractExternalGroupbyTest.java |  27 +-
 .../hyracks/examples/tpch/client/Join.java      |   4 +-
 42 files changed, 2279 insertions(+), 521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 9632aad..5aaf87b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -163,7 +163,6 @@ public class SetAsterixPhysicalOperatorsRule implements IAlgebraicRewriteRule {
                                     ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
                                             gby.getGroupByList(),
                                             physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
-                                            physicalOptimizationConfig.getExternalGroupByTableSize(),
                                             (long) physicalOptimizationConfig.getMaxFramesExternalGroupBy()
                                                     * physicalOptimizationConfig.getFrameSize());
                                     generateMergeAggregationExpressions(gby, context);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index bad6a35..8a96882 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -64,7 +64,7 @@
   </property>
   <property>
     <name>compiler.joinmemory</name>
-    <value>160KB</value>
+    <value>256KB</value>
   </property>
   <property>
     <name>storage.buffercache.pagesize</name>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/asterixdb/asterix-app/src/main/resources/asterix-build-configuration2.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration2.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration2.xml
index 7b8d274..2989fa9 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration2.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration2.xml
@@ -64,7 +64,7 @@
     </property>
     <property>
         <name>compiler.joinmemory</name>
-        <value>160KB</value>
+        <value>256KB</value>
     </property>
     <property>
         <name>compiler.parallelism</name>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/asterixdb/asterix-app/src/main/resources/asterix-build-configuration3.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration3.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration3.xml
index 85b0cdb..85881f9 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration3.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration3.xml
@@ -64,7 +64,7 @@
     </property>
     <property>
         <name>compiler.joinmemory</name>
-        <value>160KB</value>
+        <value>256KB</value>
     </property>
     <property>
         <name>compiler.parallelism</name>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm
index 1ebdc02..15b1baa 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm
@@ -14,7 +14,7 @@
         },
         "compiler.framesize": 32768,
         "compiler.groupmemory": 163840,
-        "compiler.joinmemory": 163840,
+        "compiler.joinmemory": 262144,
         "compiler.parallelism": 0,
         "compiler.pregelix.home": "~/pregelix",
         "compiler.sortmemory": 327680,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.adm
index dbdf8f0..11e964c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.adm
@@ -14,7 +14,7 @@
         },
         "compiler.framesize": 32768,
         "compiler.groupmemory": 163840,
-        "compiler.joinmemory": 163840,
+        "compiler.joinmemory": 262144,
         "compiler.parallelism": -1,
         "compiler.pregelix.home": "~/pregelix",
         "compiler.sortmemory": 327680,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.adm
index fd268da..83f18f1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.adm
@@ -14,7 +14,7 @@
         },
         "compiler.framesize": 32768,
         "compiler.groupmemory": 163840,
-        "compiler.joinmemory": 163840,
+        "compiler.joinmemory": 262144,
         "compiler.parallelism": 3,
         "compiler.pregelix.home": "~/pregelix",
         "compiler.sortmemory": 327680,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/algebricks/algebricks-core/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/pom.xml b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
index b77d567..c53a87b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
@@ -87,5 +87,16 @@
       <artifactId>commons-lang3</artifactId>
       <version>3.5</version>
     </dependency>
+    <dependency>
+      <groupId>com.e-movimento.tinytools</groupId>
+      <artifactId>privilegedaccessor</artifactId>
+      <version>1.2.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 7772620..8555ade 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -65,19 +65,18 @@ import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
 
 public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
 
-    private final int tableSize;
-    private final long fileSize;
+    private final long inputSize;
     private final int frameLimit;
     private List<LogicalVariable> columnSet = new ArrayList<LogicalVariable>();
 
     public ExternalGroupByPOperator(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList, int frameLimit,
-            int tableSize, long fileSize) {
-        this.tableSize = tableSize;
+            long fileSize) {
         this.frameLimit = frameLimit;
-        this.fileSize = fileSize;
+        this.inputSize = fileSize;
         computeColumnSet(gbyList);
     }
 
@@ -256,7 +255,14 @@ public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
 
         INormalizedKeyComputerFactory normalizedKeyFactory = JobGenHelper
                 .variablesToAscNormalizedKeyComputerFactory(gbyCols, aggOpInputEnv, context);
-        ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
+
+        // Calculates the hash table size (# of unique hash values) based on the budget and a tuple size.
+        int memoryBudgetInBytes = context.getFrameSize() * frameLimit;
+        int groupByColumnsCount = gby.getGroupByList().size() + numFds;
+        int hashTableSize = calculateGroupByTableCardinality(memoryBudgetInBytes, groupByColumnsCount,
+                context.getFrameSize());
+
+        ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, hashTableSize, inputSize,
                 keyAndDecFields, frameLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory, mergeFactory,
                 recordDescriptor, recordDescriptor, new HashSpillableTableFactory(hashFunctionFactories));
         contributeOpDesc(builder, gby, gbyOpDesc);
@@ -275,4 +281,52 @@ public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
     public boolean expensiveThanMaterialization() {
         return true;
     }
+
+    /**
+     * Based on a rough estimation of a tuple (each field size: 4 bytes) size and the number of possible hash values
+     * for the given number of group-by columns, calculates the number of hash entries for the hash table in Group-by.
+     * The formula is min(# of possible hash values, # of possible tuples in the data table).
+     * This method assumes that the group-by table consists of hash table that stores hash value of tuple pointer
+     * and data table actually stores the aggregated tuple.
+     * For more details, refer to this JIRA issue: https://issues.apache.org/jira/browse/ASTERIXDB-1556
+     *
+     * @param memoryBudgetByteSize
+     * @param numberOfGroupByColumns
+     * @return group-by table size (the cardinality of group-by table)
+     */
+    public static int calculateGroupByTableCardinality(long memoryBudgetByteSize, int numberOfGroupByColumns,
+            int frameSize) {
+        // Estimates a minimum tuple size with n fields:
+        // (4:tuple offset in a frame, 4n:each field offset in a tuple, 4n:each field size 4 bytes)
+        int tupleByteSize = 4 + 8 * numberOfGroupByColumns;
+
+        // Maximum number of tuples
+        long maxNumberOfTuplesInDataTable = memoryBudgetByteSize / tupleByteSize;
+
+        // To calculate possible hash values, this counts the number of bits.
+        // We assume that each field consists of 4 bytes.
+        // Also, too high range that is greater than Long.MAXVALUE (64 bits) is not necessary for our calculation.
+        // And, this should not generate negative numbers when shifting the number.
+        int numberOfBits = Math.min(61, numberOfGroupByColumns * 4 * 8);
+
+        // Possible number of unique hash entries
+        long possibleNumberOfHashEntries = 2L << numberOfBits;
+
+        // Between # of entries in Data table and # of possible hash values, we choose the smaller one.
+        long groupByTableCardinality = Math.min(possibleNumberOfHashEntries, maxNumberOfTuplesInDataTable);
+        long groupByTableByteSize = SerializableHashTable.getExpectedTableByteSize(groupByTableCardinality, frameSize);
+
+        // Gets the ratio of hash-table size in the total size (hash + data table).
+        double hashTableRatio = (double) groupByTableByteSize / (groupByTableByteSize + memoryBudgetByteSize);
+
+        // Gets the table size based on the ratio that we have calculated.
+        long finalGroupByTableByteSize = (long) (hashTableRatio * memoryBudgetByteSize);
+
+        long finalGroupByTableCardinality = finalGroupByTableByteSize
+                / SerializableHashTable.getExpectedByteSizePerHashValue();
+
+        // The maximum cardinality of a hash table: Integer.MAX_VALUE
+        return finalGroupByTableCardinality > Integer.MAX_VALUE ? Integer.MAX_VALUE
+                : (int) finalGroupByTableCardinality;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 248bc4f..ee0bec7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -61,6 +61,7 @@ import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescr
 
 public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
 
+    // The maximum number of in-memory frames that this hash join can use.
     private final int memSizeInFrames;
     private final int maxInputBuildSizeInFrames;
     private final int aveRecordsPerFrame;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index fff0fb4..a1d496d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -50,15 +50,19 @@ import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
 public class InMemoryHashJoinPOperator extends AbstractHashJoinPOperator {
 
     private final int tableSize;
+    // The maximum number of in-memory frames that this hash join can use.
+    private final int memSizeInFrames;
 
     /**
      * builds on the first operator and probes on the second.
      */
 
     public InMemoryHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
-            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities, int tableSize) {
+            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities, int tableSize,
+            int memSizeInFrames) {
         super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
         this.tableSize = tableSize;
+        this.memSizeInFrames = memSizeInFrames;
     }
 
     @Override
@@ -106,7 +110,7 @@ public class InMemoryHashJoinPOperator extends AbstractHashJoinPOperator {
         switch (kind) {
             case INNER: {
                 opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
-                        comparatorFactories, recDescriptor, tableSize, predEvaluatorFactory);
+                        comparatorFactories, recDescriptor, tableSize, predEvaluatorFactory, memSizeInFrames);
                 break;
             }
             case LEFT_OUTER: {
@@ -116,7 +120,7 @@ public class InMemoryHashJoinPOperator extends AbstractHashJoinPOperator {
                 }
                 opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
                         comparatorFactories, predEvaluatorFactory, recDescriptor, true, nonMatchWriterFactories,
-                        tableSize);
+                        tableSize, memSizeInFrames);
                 break;
             }
             default: {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperatorTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperatorTest.java b/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperatorTest.java
new file mode 100644
index 0000000..09b8ec0
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperatorTest.java
@@ -0,0 +1,128 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.junit.Assert;
+import org.junit.Test;
+
+import junit.extensions.PA;
+
+public class ExternalGroupByPOperatorTest {
+
+    @Test
+    public void testCalculateGroupByTableCardinality() throws Exception {
+
+        // Creates a dummy variable and an expression that are needed by the operator. They are not used by this test.
+        LogicalVariable v = new LogicalVariable(0);
+        MutableObject<ILogicalExpression> e = new MutableObject<ILogicalExpression>(new VariableReferenceExpression(v));
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = new ArrayList<>();
+        gbyList.add(new Pair<>(v, e));
+        ExternalGroupByPOperator eGByOp = new ExternalGroupByPOperator(gbyList, 0, 0);
+
+        // Test 1: compiler.groupmemory: 512 bytes, frame size: 256 bytes, with 1 column group-by
+        long memoryBudgetInBytes = 512;
+        int numberOfGroupByColumns = 1;
+        int frameSize = 256;
+        int resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 9);
+
+        // Sets the frame size to 128KB.
+        frameSize = 128 * 1024;
+
+        // Test 2: memory size: 1 MB, frame size: 128 KB, 1 column group-by
+        memoryBudgetInBytes = 1024 * 1024;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 19660);
+
+        // Test 3: memory size: 100 MB, frame size: 128 KB, 1 column group-by
+        memoryBudgetInBytes = 1024 * 1024 * 100;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 1937883);
+
+        // Test 4: memory size: 1 GB, frame size: 128 KB, 1 column group-by
+        memoryBudgetInBytes = 1024 * 1024 * 1024;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 19841178);
+
+        // Test 5: memory size: 10 GB, frame size: 128 KB, 1 column group-by
+        memoryBudgetInBytes = 1024 * 1024 * 1024 * 10L;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 198409112);
+
+        // Test 6: memory size: 100 GB, frame size: 128 KB, 1 column group-by
+        memoryBudgetInBytes = 1024 * 1024 * 1024 * 100L;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 1962753871);
+
+        // Test 7: memory size: 1 TB, frame size: 128 KB, 1 column group-by
+        // The cardinality will be set to Integer.MAX_VALUE in this case since the budget is too huge.
+        memoryBudgetInBytes = 1024 * 1024 * 1024 * 1024L;
+        frameSize = 128 * 1024;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 2147483647);
+
+        // Test 8: memory size: 1 MB, frame size: 128 KB, 2 columns group-by
+        memoryBudgetInBytes = 1024 * 1024;
+        numberOfGroupByColumns = 2;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 16681);
+
+        // Test 9: memory size: 1 MB, frame size: 128 KB, 3 columns group-by
+        memoryBudgetInBytes = 1024 * 1024;
+        numberOfGroupByColumns = 3;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 15176);
+
+        // Test 10: memory size: 1 MB, frame size: 128 KB, 4 columns group-by
+        memoryBudgetInBytes = 1024 * 1024;
+        numberOfGroupByColumns = 4;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 13878);
+
+        // Test 11: memory size: 32 MB, frame size: 128 KB, 2 columns group-by
+        memoryBudgetInBytes = 1024 * 1024 * 32L;
+        numberOfGroupByColumns = 4;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 408503);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 5240781..6fdcfdf 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -167,7 +167,6 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                                     ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
                                             gby.getGroupByList(),
                                             physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
-                                            physicalOptimizationConfig.getExternalGroupByTableSize(),
                                             (long) physicalOptimizationConfig.getMaxFramesExternalGroupBy()
                                                     * physicalOptimizationConfig.getFrameSize());
                                     op.setPhysicalOperator(externalGby);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 3332836..d66e6fc 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -24,7 +24,6 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -116,8 +115,9 @@ public class JoinUtils {
                 AlgebricksConfig.ALGEBRICKS_LOGGER
                         .fine("// HybridHashJoin inner branch " + opBuild + " fits in memory\n");
                 // maintains the local properties on the probe side
-                op.setPhysicalOperator(new InMemoryHashJoinPOperator(hhj.getKind(), hhj.getPartitioningType(),
-                        hhj.getKeysLeftBranch(), hhj.getKeysRightBranch(), v.getNumberOfTuples() * 2));
+                op.setPhysicalOperator(
+                        new InMemoryHashJoinPOperator(hhj.getKind(), hhj.getPartitioningType(), hhj.getKeysLeftBranch(),
+                                hhj.getKeysRightBranch(), v.getNumberOfTuples() * 2, hhj.getMemSizeInFrames()));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameTupleReversibleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameTupleReversibleAppender.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameTupleReversibleAppender.java
new file mode 100644
index 0000000..a74f6b8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameTupleReversibleAppender.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.comm;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A Tuple Appender class whose last append action can be canceled.
+ */
+public interface IFrameTupleReversibleAppender {
+
+    /**
+     * Cancels the effect of the last append operation. i.e. undoes the last append operation.
+     */
+    boolean cancelAppend() throws HyracksDataException;
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FixedSizeFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FixedSizeFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FixedSizeFrameTupleAppender.java
index 60c6e6d..f1ea839 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FixedSizeFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FixedSizeFrameTupleAppender.java
@@ -19,9 +19,13 @@
 
 package org.apache.hyracks.dataflow.common.comm.io;
 
+import org.apache.hyracks.api.comm.FrameConstants;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrameTupleReversibleAppender;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.IntSerDeUtils;
 
-public class FixedSizeFrameTupleAppender extends FrameTupleAppender {
+public class FixedSizeFrameTupleAppender extends FrameTupleAppender implements IFrameTupleReversibleAppender {
     @Override
     protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException {
         if (hasEnoughSpace(fieldCount, dataLength)) {
@@ -29,4 +33,26 @@ public class FixedSizeFrameTupleAppender extends FrameTupleAppender {
         }
         return false;
     }
-}
+
+    /**
+     * Cancels the lastly performed append operation. i.e. decreases the tuple count and resets the data end offset.
+     */
+    @Override
+    public boolean cancelAppend() throws HyracksDataException {
+        // Decreases tupleCount by one.
+        tupleCount = IntSerDeUtils.getInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()));
+        if (tupleCount == 0) {
+            // There is no inserted tuple in the given frame. This should not happen.
+            return false;
+        }
+        tupleCount = tupleCount - 1;
+
+        // Resets tupleCount and DataEndOffset.
+        IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+        tupleDataEndOffset = tupleCount == 0 ? FrameConstants.TUPLE_START_OFFSET
+                : IntSerDeUtils.getInt(array,
+                        FrameHelper.getTupleCountOffset(frame.getFrameSize()) - tupleCount * FrameConstants.SIZE_LEN);
+        return true;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index fb160f0..71ac6a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -88,4 +88,8 @@ public class RunFileReader implements IFrameReader {
     public long getFileSize() {
         return size;
     }
+
+    public void setDeleteAfterClose(boolean deleteAfterClose) {
+        this.deleteAfterClose = deleteAfterClose;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameBufferManager.java
new file mode 100644
index 0000000..700500b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameBufferManager.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * General Frame based buffer manager class
+ */
+public class FrameBufferManager implements IFrameBufferManager {
+
+    ArrayList<ByteBuffer> buffers = new ArrayList<>();
+
+    @Override
+    public void reset() throws HyracksDataException {
+        buffers.clear();
+    }
+
+    @Override
+    public BufferInfo getFrame(int frameIndex, BufferInfo returnedInfo) {
+        returnedInfo.reset(buffers.get(frameIndex), 0, buffers.get(frameIndex).capacity());
+        return returnedInfo;
+    }
+
+    @Override
+    public int getNumFrames() {
+        return buffers.size();
+    }
+
+    @Override
+    public int insertFrame(ByteBuffer frame) throws HyracksDataException {
+        buffers.add(frame);
+        return buffers.size() - 1;
+    }
+
+    @Override
+    public void close() {
+        buffers = null;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FramePoolBackedFrameBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FramePoolBackedFrameBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FramePoolBackedFrameBufferManager.java
new file mode 100644
index 0000000..9808797
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FramePoolBackedFrameBufferManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This is a simple frame based buffer manager that uses a deallocatable frame pool.
+ * We assume that the list of assigned buffers are managed by another classes that call the methods of this class.
+ */
+public class FramePoolBackedFrameBufferManager implements ISimpleFrameBufferManager {
+
+    private final IDeallocatableFramePool framePool;
+
+    public FramePoolBackedFrameBufferManager(IDeallocatableFramePool framePool) {
+        this.framePool = framePool;
+    }
+
+    @Override
+    public ByteBuffer acquireFrame(int frameSize) throws HyracksDataException {
+        return framePool.allocateFrame(frameSize);
+    }
+
+    @Override
+    public void releaseFrame(ByteBuffer frame) {
+        framePool.deAllocateBuffer(frame);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
index a0a9ab0..6c08be2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -59,7 +59,7 @@ public interface IPartitionedTupleBufferManager {
     /**
      * Insert tuple {@code tupleId} from the {@code tupleAccessor} into the given partition.
      * The returned handle is written into the tuplepointer
-     * 
+     *
      * @param partition
      *            the id of the partition to insert the tuple
      * @param tupleAccessor
@@ -75,8 +75,13 @@ public interface IPartitionedTupleBufferManager {
             throws HyracksDataException;
 
     /**
+     * Cancels the effect of last insertTuple() operation. i.e. undoes the last insertTuple() operation.
+     */
+    void cancelInsertTuple(int partition) throws HyracksDataException;
+
+    /**
      * Reset to the initial states. The previous allocated resources won't be released in order to be used in the next round.
-     * 
+     *
      * @throws HyracksDataException
      */
     void reset() throws HyracksDataException;
@@ -93,7 +98,7 @@ public interface IPartitionedTupleBufferManager {
      * This partition will not be cleared.
      * Currently it is used by Join where we flush the inner partition to the join (as a frameWriter),
      * but we will still keep the inner for the next outer partition.
-     * 
+     *
      * @param pid
      * @param writer
      * @throws HyracksDataException
@@ -102,7 +107,7 @@ public interface IPartitionedTupleBufferManager {
 
     /**
      * Clear the memory occupation of the particular partition.
-     * 
+     *
      * @param partition
      * @throws HyracksDataException
      */

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ISimpleFrameBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ISimpleFrameBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ISimpleFrameBufferManager.java
new file mode 100644
index 0000000..b0addfd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ISimpleFrameBufferManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Manages the buffer space in the unit of frame.
+ * This buffer manager is suitable for a structure that manages
+ * the list of assigned frames on its own (e.g., SerializableHashTable class).
+ */
+public interface ISimpleFrameBufferManager {
+
+    /**
+     * Gets a frame from this buffer manager.
+     */
+    public ByteBuffer acquireFrame(int frameSize) throws HyracksDataException;
+
+    /**
+     * Releases a frame to this buffer manager.
+     */
+    public void releaseFrame(ByteBuffer frame);
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
index f9387a9..12985c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
@@ -30,32 +30,34 @@ public class PreferToSpillFullyOccupiedFramePolicy {
 
     private final IPartitionedTupleBufferManager bufferManager;
     private final BitSet spilledStatus;
-    private final int minFrameSize;
 
-    public PreferToSpillFullyOccupiedFramePolicy(IPartitionedTupleBufferManager bufferManager, BitSet spilledStatus,
-            int minFrameSize) {
+    public PreferToSpillFullyOccupiedFramePolicy(IPartitionedTupleBufferManager bufferManager, BitSet spilledStatus) {
         this.bufferManager = bufferManager;
         this.spilledStatus = spilledStatus;
-        this.minFrameSize = minFrameSize;
     }
 
+    /**
+     * This method tries to find a victim partition.
+     * We want to keep in-memory partitions (not spilled to the disk yet) as long as possible to reduce the overhead
+     * of writing to and reading from the disk.
+     * If the given partition contains one or more tuple, then try to spill the given partition.
+     * If not, try to flush another an in-memory partition.
+     * Note: right now, the createAtMostOneFrameForSpilledPartitionConstrain we are using for a spilled partition
+     * enforces that the number of maximum frame for a spilled partition is 1.
+     */
     public int selectVictimPartition(int failedToInsertPartition) {
-        // To avoid flush the half-full frame, it's better to spill itself.
+        // To avoid flushing another partition with the last half-full frame, it's better to spill the given partition
+        // since one partition needs to be spilled to the disk anyway. Another reason is that we know that
+        // the last frame in this partition is full.
         if (bufferManager.getNumTuples(failedToInsertPartition) > 0) {
             return failedToInsertPartition;
         }
-        int partitionToSpill = findSpilledPartitionWithMaxMemoryUsage();
-        int maxToSpillPartSize = 0;
-        // if we couldn't find the already spilled partition, or it is too small to flush that one,
-        // try to flush an in memory partition.
-        if (partitionToSpill < 0
-                || (maxToSpillPartSize = bufferManager.getPhysicalSize(partitionToSpill)) == minFrameSize) {
-            int partitionInMem = findInMemPartitionWithMaxMemoryUsage();
-            if (partitionInMem >= 0 && bufferManager.getPhysicalSize(partitionInMem) > maxToSpillPartSize) {
-                partitionToSpill = partitionInMem;
-            }
-        }
-        return partitionToSpill;
+        // If the given partition doesn't contain any tuple in memory, try to flush a different in-memory partition.
+        // We are not trying to steal a frame from another spilled partition since once spilled, a partition can only
+        // have only one frame and we don't know whether the frame is fully occupied or not.
+        // TODO: Once we change this policy (spilled partition can have only one frame in memory),
+        //       we need to revise this method, too.
+        return findInMemPartitionWithMaxMemoryUsage();
     }
 
     public int findInMemPartitionWithMaxMemoryUsage() {
@@ -80,8 +82,8 @@ public class PreferToSpillFullyOccupiedFramePolicy {
     }
 
     /**
-     * Create an constrain for the already spilled partition that it can only use at most one frame.
-     * 
+     * Create a constrain for the already spilled partition that it can only use at most one frame.
+     *
      * @param spillStatus
      * @return
      */

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/TupleInFrameListAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/TupleInFrameListAccessor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/TupleInFrameListAccessor.java
new file mode 100644
index 0000000..2c9d24a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/TupleInFrameListAccessor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender;
+import org.apache.hyracks.dataflow.std.sort.util.IAppendDeletableFrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ * tuple accessor class for a frame in a frame list using a tuple pointer.
+ */
+public class TupleInFrameListAccessor extends AbstractTuplePointerAccessor {
+
+    private IAppendDeletableFrameTupleAccessor bufferAccessor;
+    private List<ByteBuffer> bufferFrames;
+
+    public TupleInFrameListAccessor(RecordDescriptor rd, List<ByteBuffer> bufferFrames) {
+        bufferAccessor = new DeletableFrameTupleAppender(rd);
+        this.bufferFrames = bufferFrames;
+    }
+
+    @Override
+    IFrameTupleAccessor getInnerAccessor() {
+        return bufferAccessor;
+    }
+
+    @Override
+    void resetInnerAccessor(TuplePointer tuplePointer) {
+        bufferAccessor.reset(bufferFrames.get(tuplePointer.getFrameIndex()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 4d4f279..4578c2e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -20,7 +20,6 @@
 package org.apache.hyracks.dataflow.std.buffermanager;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.hyracks.api.comm.FixedSizeFrame;
@@ -54,6 +53,18 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana
     private BufferInfo tempInfo;
     private final IPartitionedMemoryConstrain constrain;
 
+    // In case where a frame pool is shared by one or more buffer manager(s), it can be provided from the caller.
+    public VPartitionTupleBufferManager(IPartitionedMemoryConstrain constrain, int partitions,
+            IDeallocatableFramePool framePool) throws HyracksDataException {
+        this.constrain = constrain;
+        this.framePool = framePool;
+        this.partitionArray = new IFrameBufferManager[partitions];
+        this.numTuples = new int[partitions];
+        this.appendFrame = new FixedSizeFrame();
+        this.appender = new FixedSizeFrameTupleAppender();
+        this.tempInfo = new BufferInfo(null, -1, -1);
+    }
+
     public VPartitionTupleBufferManager(IHyracksFrameMgrContext ctx, IPartitionedMemoryConstrain constrain,
             int partitions, int frameLimitInBytes) throws HyracksDataException {
         this.constrain = constrain;
@@ -146,6 +157,17 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana
                 tupleAccessor.getTupleStartOffset(tupleId), tupleAccessor.getTupleLength(tupleId), pointer);
     }
 
+    @Override
+    public void cancelInsertTuple(int partition) throws HyracksDataException {
+        int fid = getLastBuffer(partition);
+        if (fid < 0) {
+            throw new HyracksDataException("Couldn't get the last frame for the given partition.");
+        }
+        partitionArray[partition].getFrame(fid, tempInfo);
+        deleteTupleFromBuffer(tempInfo);
+        numTuples[partition]--;
+    }
+
     private static int calculateActualSize(int[] fieldEndOffsets, int size) {
         if (fieldEndOffsets != null) {
             return FrameHelper.calcRequiredSpace(fieldEndOffsets.length, size);
@@ -200,11 +222,25 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana
         return -1;
     }
 
+    private void deleteTupleFromBuffer(BufferInfo bufferInfo) throws HyracksDataException {
+        if (bufferInfo.getBuffer() != appendFrame.getBuffer()) {
+            appendFrame.reset(bufferInfo.getBuffer());
+            appender.reset(appendFrame, false);
+        }
+        if (!appender.cancelAppend()) {
+            throw new HyracksDataException("Undoing the last insertion in the given frame couldn't be done.");
+        }
+    }
+
     private int getLastBufferOrCreateNewIfNotExist(int partition, int actualSize) throws HyracksDataException {
         if (partitionArray[partition] == null || partitionArray[partition].getNumFrames() == 0) {
-            partitionArray[partition] = new PartitionFrameBufferManager();
+            partitionArray[partition] = new FrameBufferManager();
             return createNewBuffer(partition, actualSize);
         }
+        return getLastBuffer(partition);
+    }
+
+    private int getLastBuffer(int partition) throws HyracksDataException {
         return partitionArray[partition].getNumFrames() - 1;
     }
 
@@ -219,39 +255,6 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana
         Arrays.fill(partitionArray, null);
     }
 
-    private static class PartitionFrameBufferManager implements IFrameBufferManager {
-
-        ArrayList<ByteBuffer> buffers = new ArrayList<>();
-
-        @Override
-        public void reset() throws HyracksDataException {
-            buffers.clear();
-        }
-
-        @Override
-        public BufferInfo getFrame(int frameIndex, BufferInfo returnedInfo) {
-            returnedInfo.reset(buffers.get(frameIndex), 0, buffers.get(frameIndex).capacity());
-            return returnedInfo;
-        }
-
-        @Override
-        public int getNumFrames() {
-            return buffers.size();
-        }
-
-        @Override
-        public int insertFrame(ByteBuffer frame) throws HyracksDataException {
-            buffers.add(frame);
-            return buffers.size() - 1;
-        }
-
-        @Override
-        public void close() {
-            buffers = null;
-        }
-
-    }
-
     @Override
     public ITuplePointerAccessor getTuplePointerAccessor(final RecordDescriptor recordDescriptor) {
         return new AbstractTuplePointerAccessor() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 34fdc48..cd79a30 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -36,7 +36,11 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
+import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedTupleBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
 import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
 import org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
 import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
@@ -51,6 +55,10 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
     private static final double FUDGE_FACTOR = 1.1;
     private static final long serialVersionUID = 1L;
     private final IBinaryHashFunctionFamily[] hashFunctionFamilies;
+    private static final int MIN_DATA_TABLE_FRAME_LIMT = 1;
+    private static final int MIN_HASH_TABLE_FRAME_LIMT = 2;
+    private static final int OUTPUT_FRAME_LIMT = 1;
+    private static final int MIN_FRAME_LIMT = MIN_DATA_TABLE_FRAME_LIMT + MIN_HASH_TABLE_FRAME_LIMT + OUTPUT_FRAME_LIMT;
 
     public HashSpillableTableFactory(IBinaryHashFunctionFamily[] hashFunctionFamilies) {
         this.hashFunctionFamilies = hashFunctionFamilies;
@@ -62,11 +70,15 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
             final INormalizedKeyComputer firstKeyNormalizerFactory, IAggregatorDescriptorFactory aggregateFactory,
             RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, final int framesLimit,
             final int seed) throws HyracksDataException {
-        if (framesLimit < 2) {
-            throw new HyracksDataException("The frame limit is too small to partition the data");
-        }
         final int tableSize = suggestTableSize;
 
+        // For HashTable, we need to have at least two frames (one for header and one for content).
+        // For DataTable, we need to have at least one frame.
+        // For the output, we need to have at least one frame.
+        if (framesLimit < MIN_FRAME_LIMT) {
+            throw new HyracksDataException("The given frame limit is too small to partition the data.");
+        }
+
         final int[] intermediateResultKeys = new int[keyFields.length];
         for (int i = 0; i < keyFields.length; i++) {
             intermediateResultKeys[i] = i;
@@ -78,6 +90,12 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
         final ITuplePartitionComputer tpc = new FieldHashPartitionComputerFamily(keyFields, hashFunctionFamilies)
                 .createPartitioner(seed);
 
+        // For calculating hash value for the already aggregated tuples (not incoming tuples)
+        // This computer is required to calculate the hash value of a aggregated tuple
+        // while doing the garbage collection work on Hash Table.
+        final ITuplePartitionComputer tpcIntermediate = new FieldHashPartitionComputerFamily(intermediateResultKeys,
+                hashFunctionFamilies).createPartitioner(seed);
+
         final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
                 outRecordDescriptor, keyFields, intermediateResultKeys, null);
 
@@ -86,31 +104,42 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
         final ArrayTupleBuilder stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
 
         //TODO(jf) research on the optimized partition size
-        final int numPartitions = getNumOfPartitions((int) (inputDataBytesSize / ctx.getInitialFrameSize()),
-                framesLimit - 1);
+        long memoryBudget = Math.max(MIN_DATA_TABLE_FRAME_LIMT + MIN_HASH_TABLE_FRAME_LIMT,
+                framesLimit - OUTPUT_FRAME_LIMT - MIN_HASH_TABLE_FRAME_LIMT);
+
+        final int numPartitions = getNumOfPartitions(inputDataBytesSize / ctx.getInitialFrameSize(), memoryBudget);
         final int entriesPerPartition = (int) Math.ceil(1.0 * tableSize / numPartitions);
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine(
-                    "create hashtable, table size:" + tableSize + " file size:" + inputDataBytesSize + "  partitions:"
+                    "created hashtable, table size:" + tableSize + " file size:" + inputDataBytesSize + "  #partitions:"
                     + numPartitions);
         }
 
         final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
 
-        final ISerializableTable hashTableForTuplePointer = new SerializableHashTable(tableSize, ctx);
-
         return new ISpillableTable() {
 
             private final TuplePointer pointer = new TuplePointer();
             private final BitSet spilledSet = new BitSet(numPartitions);
-            final IPartitionedTupleBufferManager bufferManager = new VPartitionTupleBufferManager(ctx,
+            // This frame pool will be shared by both data table and hash table.
+            private final IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx,
+                    framesLimit * ctx.getInitialFrameSize());
+            // buffer manager for hash table
+            private final ISimpleFrameBufferManager bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(
+                    framePool);
+
+            private final ISerializableTable hashTableForTuplePointer = new SerializableHashTable(tableSize, ctx,
+                    bufferManagerForHashTable);
+
+            // buffer manager for data table
+            final IPartitionedTupleBufferManager bufferManager = new VPartitionTupleBufferManager(
                     PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledSet),
-                    numPartitions, framesLimit * ctx.getInitialFrameSize());
+                    numPartitions, framePool);
 
             final ITuplePointerAccessor bufferAccessor = bufferManager.getTuplePointerAccessor(outRecordDescriptor);
 
             private final PreferToSpillFullyOccupiedFramePolicy spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(
-                    bufferManager, spilledSet, ctx.getInitialFrameSize());
+                    bufferManager, spilledSet);
 
             private final FrameTupleAppender outputAppender = new FrameTupleAppender(new VSizeFrame(ctx));
 
@@ -125,6 +154,17 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
                 for (int p = getFirstEntryInHashTable(partition); p < getLastEntryInHashTable(partition); p++) {
                     hashTableForTuplePointer.delete(p);
                 }
+
+                // Checks whether the garbage collection is required and conducts a garbage collection if so.
+                if (hashTableForTuplePointer.isGarbageCollectionNeeded()) {
+                    int numberOfFramesReclaimed = hashTableForTuplePointer.collectGarbage(bufferAccessor,
+                            tpcIntermediate);
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("Garbage Collection on Hash table is done. Deallocated frames:"
+                                + numberOfFramesReclaimed);
+                    }
+                }
+
                 bufferManager.clearPartition(partition);
             }
 
@@ -152,20 +192,34 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
                         return true;
                     }
                 }
-
                 return insertNewAggregateEntry(entryInHashTable, accessor, tIndex);
             }
 
+            /**
+             * Inserts a new aggregate entry into the data table and hash table.
+             * This insertion must be an atomic operation. We cannot have a partial success or failure.
+             * So, if an insertion succeeds on the data table and the same insertion on the hash table fails, then
+             * we need to revert the effect of data table insertion.
+             */
             private boolean insertNewAggregateEntry(int entryInHashTable, IFrameTupleAccessor accessor, int tIndex)
                     throws HyracksDataException {
                 initStateTupleBuilder(accessor, tIndex);
                 int pid = getPartition(entryInHashTable);
 
+                // Insertion to the data table
                 if (!bufferManager.insertTuple(pid, stateTupleBuilder.getByteArray(),
                         stateTupleBuilder.getFieldEndOffsets(), 0, stateTupleBuilder.getSize(), pointer)) {
                     return false;
                 }
-                hashTableForTuplePointer.insert(entryInHashTable, pointer);
+
+                // Insertion to the hash table
+                if (!hashTableForTuplePointer.insert(entryInHashTable, pointer)) {
+                    // To preserve the atomicity of this method, we need to undo the effect
+                    // of the above bufferManager.insertTuple() call since the given insertion has failed.
+                    bufferManager.cancelInsertTuple(pid);
+                    return false;
+                }
+
                 return true;
             }
 
@@ -240,25 +294,30 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
     }
 
     /**
-     * Calculate the number of partitions for Data table. The formula is from Shapiro's paper -
+     * Calculates the number of partitions for Data table. The formula is from Shapiro's paper -
      * http://cs.stanford.edu/people/chrismre/cs345/rl/shapiro.pdf. Check the page 249 for more details.
      * If the required number of frames is greater than the number of available frames, we make sure that
      * at least two partitions will be created. Also, if the number of partitions is greater than the memory budget,
      * we may not allocate at least one frame for each partition in memory. So, we also deal with those cases
      * at the final part of the method.
+     * The maximum number of partitions is limited to Integer.MAX_VALUE.
      */
-    private int getNumOfPartitions(int nubmerOfInputFrames, int frameLimit) {
+    private int getNumOfPartitions(long nubmerOfInputFrames, long frameLimit) {
         if (frameLimit >= nubmerOfInputFrames * FUDGE_FACTOR) {
-            return 1; // all in memory, we will create a big partition.
+            // all in memory, we will create two big partitions. We set 2 (not 1) to avoid the corner case
+            // where the only partition may be spilled to the disk. This may happen since this formula doesn't consider
+            // the hash table size. If this is the case, we will have an indefinite loop - keep spilling the same
+            // partition again and again.
+            return 2;
         }
-        int numberOfPartitions = (int) (Math
+        long numberOfPartitions = (long) (Math
                 .ceil((nubmerOfInputFrames * FUDGE_FACTOR - frameLimit) / (frameLimit - 1)));
         numberOfPartitions = Math.max(2, numberOfPartitions);
         if (numberOfPartitions > frameLimit) {
-            numberOfPartitions = (int) Math.ceil(Math.sqrt(nubmerOfInputFrames * FUDGE_FACTOR));
-            return Math.max(2, Math.min(numberOfPartitions, frameLimit));
+            numberOfPartitions = (long) Math.ceil(Math.sqrt(nubmerOfInputFrames * FUDGE_FACTOR));
+            return (int) Math.min(Math.max(2, Math.min(numberOfPartitions, frameLimit)), Integer.MAX_VALUE);
         }
-        return numberOfPartitions;
+        return (int) Math.min(numberOfPartitions, Integer.MAX_VALUE);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
index dbe6858..629f211 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
@@ -27,8 +27,8 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface ISpillableTableFactory extends Serializable {
-    ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int inputSizeInTuple, long dataBytesSize, int[] keyFields,
-            IBinaryComparator[] comparatorFactories, INormalizedKeyComputer firstKeyNormalizerFactory,
+    ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int inputSizeInTuple, long dataBytesSize,
+            int[] keyFields, IBinaryComparator[] comparatorFactories, INormalizedKeyComputer firstKeyNormalizerFactory,
             IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int framesLimit, int seed) throws HyracksDataException;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
index 852a160..7d10802 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
@@ -106,7 +106,7 @@ public class ExternalGroupBuildOperatorNodePushable extends AbstractUnaryInputSi
 
     @Override
     public void close() throws HyracksDataException {
-        if (isFailed) {
+        if (isFailed && state.getRuns() != null) {
             for (int i = 0; i < state.getRuns().length; i++) {
                 RunFileWriter run = state.getRuns()[i];
                 if (run != null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
index 433b75d..4e0724c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
@@ -65,12 +65,13 @@ public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor
             RecordDescriptor outRecordDesc, ISpillableTableFactory spillableTableFactory) {
         super(spec, 1, 1);
         this.framesLimit = framesLimit;
-        if (framesLimit <= 1) {
+        if (framesLimit <= 3) {
             /**
-             * Minimum of 2 frames: 1 for input records, and 1 for output
+             * Minimum of 4 frames: 1 for input records, 1 for output, and 2 for hash table (1 header and 1 content)
              * aggregation results.
              */
-            throw new IllegalStateException("frame limit should at least be 2, but it is " + framesLimit + "!");
+            throw new IllegalStateException(
+                    "Frame limit for the External Group Operator should at least be 4, but it is " + framesLimit + "!");
         }
         this.partialAggregatorFactory = partialAggregatorFactory;
         this.intermediateAggregateFactory = intermediateAggregateFactory;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8b2aceeb/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 4354367..ad14bad 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -58,7 +58,7 @@ import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
-import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
+import org.apache.hyracks.dataflow.std.structures.SimpleSerializableHashTable;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 
 public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -308,10 +308,11 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                     ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
                             .createPartitioner();
                     int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
-                    ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+                    ISerializableTable table = new SimpleSerializableHashTable(tableSize, ctx);
                     state.joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpc0,
-                            new FrameTupleAccessor(rd1), hpc1, new FrameTuplePairComparator(keys0, keys1, comparators),
-                            isLeftOuter, nullWriters1, table, predEvaluator);
+                            new FrameTupleAccessor(rd1), rd1, hpc1,
+                            new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table,
+                            predEvaluator, null);
                     bufferForPartitions = new IFrame[state.nPartitions];
                     state.fWriters = new RunFileWriter[state.nPartitions];
                     for (int i = 0; i < state.nPartitions; i++) {
@@ -492,7 +493,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                             } else {
                                 tableSize = (int) (memsize * recordsPerFrame * factor);
                             }
-                            ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+                            ISerializableTable table = new SimpleSerializableHashTable(tableSize, ctx);
                             for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
                                 RunFileWriter buildWriter = buildWriters[partitionid];
                                 RunFileWriter probeWriter = probeWriters[partitionid];
@@ -501,9 +502,9 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                                 }
                                 table.reset();
                                 InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize,
-                                        new FrameTupleAccessor(rd0), hpcRep0, new FrameTupleAccessor(rd1), hpcRep1,
+                                        new FrameTupleAccessor(rd0), hpcRep0, new FrameTupleAccessor(rd1), rd1, hpcRep1,
                                         new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
-                                        nullWriters1, table, predEvaluator);
+                                        nullWriters1, table, predEvaluator, null);
 
                                 if (buildWriter != null) {
                                     RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();


Mime
View raw message