asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject asterixdb git commit: [ASTERIXDB-1812][RT] Budget the memory usage for pre-clustered group-by.
Date Tue, 22 Aug 2017 04:41:05 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master de892d7c0 -> 6ea45c88a


[ASTERIXDB-1812][RT] Budget the memory usage for pre-clustered group-by.

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- let pre-clustered group-by consider memory budget.

Change-Id: I670269b0b8f446d06d8dd73202194574aa524e85
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1940
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/6ea45c88
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/6ea45c88
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/6ea45c88

Branch: refs/heads/master
Commit: 6ea45c88abc7fab74d9f322b15d95c2d1420a10d
Parents: de892d7
Author: Yingyi Bu <yingyi@couchbase.com>
Authored: Mon Aug 21 16:59:55 2017 -0700
Committer: Yingyi Bu <buyingyi@gmail.com>
Committed: Mon Aug 21 21:40:42 2017 -0700

----------------------------------------------------------------------
 .../rules/SetAsterixPhysicalOperatorsRule.java  | 10 +++--
 .../big_object_groupby.3.query.aql              |  2 +
 .../big_object_groupby.3.query.sqlpp            |  1 +
 .../group-by/listify-2/listify-2.1.ddl.sqlpp    | 47 ++++++++++++++++++++
 .../group-by/listify-2/listify-2.2.update.sqlpp | 24 ++++++++++
 .../group-by/listify-2/listify-2.3.query.sqlpp  | 30 +++++++++++++
 .../group-by/listify/listify.1.ddl.sqlpp        | 47 ++++++++++++++++++++
 .../group-by/listify/listify.2.update.sqlpp     | 24 ++++++++++
 .../group-by/listify/listify.3.query.sqlpp      | 33 ++++++++++++++
 .../big_object_groupby/big_object_groupby.3.ast |  1 +
 .../resources/runtimets/testsuite_sqlpp.xml     | 12 +++++
 .../physical/PreclusteredGroupByPOperator.java  |  6 ++-
 .../base/PhysicalOptimizationConfig.java        |  2 +-
 .../rewriter/rules/PushGroupByIntoSortRule.java |  2 +-
 .../SetAlgebricksPhysicalOperatorsRule.java     |  7 +--
 ...estedPlansAccumulatingAggregatorFactory.java | 24 +++++++++-
 .../NestedPlansRunningAggregatorFactory.java    |  2 +-
 ...SerializableAggregatorDescriptorFactory.java |  4 +-
 ...AlgebricksAccumulatingAggregatorFactory.java |  3 +-
 .../hyracks/api/exceptions/ErrorCode.java       |  2 +
 .../src/main/resources/errormsg/en.properties   |  2 +
 ...AccumulatingAggregatorDescriptorFactory.java | 10 ++---
 .../std/group/HashSpillableTableFactory.java    |  2 +-
 .../std/group/IAggregatorDescriptorFactory.java |  2 +-
 .../MultiFieldsAggregatorFactory.java           |  5 +--
 .../PreclusteredGroupOperatorDescriptor.java    | 14 ++++--
 .../PreclusteredGroupOperatorNodePushable.java  |  7 ++-
 .../preclustered/PreclusteredGroupWriter.java   | 22 ++++++---
 28 files changed, 310 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 5aaf87b..d22ec54 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -162,8 +162,8 @@ public class SetAsterixPhysicalOperatorsRule implements IAlgebraicRewriteRule {
                                     }
                                     ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
                                             gby.getGroupByList(),
-                                            physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
-                                            (long) physicalOptimizationConfig.getMaxFramesExternalGroupBy()
+                                            physicalOptimizationConfig.getMaxFramesForGroupBy(),
+                                            (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
                                                     * physicalOptimizationConfig.getFrameSize());
                                     generateMergeAggregationExpressions(gby, context);
                                     op.setPhysicalOperator(externalGby);
@@ -182,7 +182,8 @@ public class SetAsterixPhysicalOperatorsRule implements IAlgebraicRewriteRule {
                                         columnList.add(varRef.getVariableReference());
                                     }
                                 }
-                                op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll()));
+                                op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
+                                        context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
                             }
                         }
                     } else if (((AbstractLogicalOperator) (r0.getValue())).getOperatorTag()
@@ -196,7 +197,8 @@ public class SetAsterixPhysicalOperatorsRule implements IAlgebraicRewriteRule {
                                 columnList.add(varRef.getVariableReference());
                             }
                         }
-                        op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll()));
+                        op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
+                                context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
                     } else {
                         throw new AlgebricksException("Unsupported nested operator within a group-by: "
                                 + ((AbstractLogicalOperator) (r0.getValue())).getOperatorTag().name());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql
index 493cbb0..4fa4f3e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql
@@ -25,6 +25,8 @@
 
 use dataverse test;
 
+set "compiler.groupmemory" "32MB"
+
 for $i in dataset('Line')
 group by $partkey := $i.l_partkey with $i
 order by $partkey

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp
index b5388a9..5a5009b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp
@@ -25,6 +25,7 @@
 
 use test;
 
+set `compiler.groupmemory` `32MB`
 
 select element {'partkey':partkey,'lines': (from g select value i) }
 from  Line as i

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.1.ddl.sqlpp
new file mode 100644
index 0000000..92698ab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.1.ddl.sqlpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type tpch.LineItemType as
+ closed {
+  l_orderkey : bigint,
+  l_partkey : bigint,
+  l_suppkey : bigint,
+  l_linenumber : bigint,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+}
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.2.update.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.2.update.sqlpp
new file mode 100644
index 0000000..5fe734c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.3.query.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.3.query.sqlpp
new file mode 100644
index 0000000..7379646
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.3.query.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `compiler.groupmemory` "576KB"
+
+SELECT *
+FROM  LineItem AS l
+WHERE l.l_shipdate <= '1998-09-02'
+/* +hash */
+GROUP BY l.l_returnflag AS l_returnflag, l.l_linestatus AS l_linestatus
+ORDER BY l_returnflag, l_linestatus
+;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp
new file mode 100644
index 0000000..92698ab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type tpch.LineItemType as
+ closed {
+  l_orderkey : bigint,
+  l_partkey : bigint,
+  l_suppkey : bigint,
+  l_linenumber : bigint,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+}
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp
new file mode 100644
index 0000000..5fe734c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp
new file mode 100644
index 0000000..ba1e7d4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `compiler.groupmemory` "576KB"
+
+SELECT  l_returnflag AS l_returnflag,
+        l_linestatus AS l_linestatus,
+        g
+FROM  LineItem AS l
+WHERE l.l_shipdate <= '1998-09-02'
+/* +hash */
+GROUP BY l.l_returnflag AS l_returnflag, l.l_linestatus AS l_linestatus
+GROUP AS g
+ORDER BY l_returnflag, l_linestatus
+;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast
index 1b4831f..9747f1e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast
@@ -1,4 +1,5 @@
 DataverseUse test
+Set compiler.groupmemory=32MB
 Query:
 SELECT ELEMENT [
 RecordConstructor [

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 70b3486..16d4cb3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -2887,6 +2887,18 @@
         <expected-error>Invalid item type: function agg-sum cannot process item type object in an input array (or multiset)</expected-error>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="group-by">
+      <compilation-unit name="listify">
+        <output-dir compare="Text">none</output-dir>
+        <expected-error>The byte size of a single group</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="group-by">
+      <compilation-unit name="listify-2">
+        <output-dir compare="Text">none</output-dir>
+        <expected-error>The byte size of a single group</expected-error>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="index-join">
     <test-case FilePath="index-join">

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index 5a465f7..78e4795 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -44,10 +44,12 @@ import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOpera
 public class PreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator {
 
     private final boolean groupAll;
+    private final int framesLimit;
 
-    public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, boolean groupAll) {
+    public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, boolean groupAll, int framesLimit) {
         super(columnList);
         this.groupAll = groupAll;
+        this.framesLimit = framesLimit;
     }
 
     @Override
@@ -86,7 +88,7 @@ public class PreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOp
                 context);
 
         PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
-                comparatorFactories, aggregatorFactory, recordDescriptor, groupAll);
+                comparatorFactories, aggregatorFactory, recordDescriptor, groupAll, framesLimit);
 
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index 7340882..a242927 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -91,7 +91,7 @@ public class PhysicalOptimizationConfig {
         setInt(MAX_FRAMES_FOR_JOIN, frameLimit);
     }
 
-    public int getMaxFramesExternalGroupBy() {
+    public int getMaxFramesForGroupBy() {
         int frameSize = getFrameSize();
         return getInt(MAX_FRAMES_EXTERNAL_GROUP_BY, (int) (((long) 256 * MB) / frameSize));
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
index 576bd62..7ea1327 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
@@ -102,7 +102,7 @@ public class PushGroupByIntoSortRule implements IAlgebraicRewriteRule {
                         //replace preclustered gby with sort gby
                         if (!groupByOperator.isGroupAll()) {
                             op.setPhysicalOperator(new SortGroupByPOperator(groupByOperator.getGroupByList(),
-                                    context.getPhysicalOptimizationConfig().getMaxFramesExternalGroupBy(),
+                                    context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy(),
                                     sortPhysicalOperator.getSortColumns()));
                         }
                         // remove the stable sort operator

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 7dc59f6..16ed9cb 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -167,8 +167,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                                 if (hasIntermediateAgg) {
                                     ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
                                             gby.getGroupByList(),
-                                            physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
-                                            (long) physicalOptimizationConfig.getMaxFramesExternalGroupBy()
+                                            physicalOptimizationConfig.getMaxFramesForGroupBy(),
+                                            (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
                                                     * physicalOptimizationConfig.getFrameSize());
                                     op.setPhysicalOperator(externalGby);
                                     break;
@@ -187,7 +187,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                         }
                     }
                     if (topLevelOp) {
-                        op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll()));
+                        op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
+                                context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
                     } else {
                         op.setPhysicalOperator(new MicroPreclusteredGroupByPOperator(columnList));
                     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 893aa61..94af04f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -53,7 +54,8 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
 
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
-            RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
+            RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys, long memoryBudget)
+            throws HyracksDataException {
         final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length, decorFieldIdx.length);
         final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
@@ -86,6 +88,9 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                // Checks the memory usage.
+                memoryUsageCheck();
+
                 for (int i = 0; i < pipelines.length; i++) {
                     pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
                 }
@@ -98,7 +103,10 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
                     outputWriter.setInputIdx(i);
                     pipelines[i].close();
                 }
-                // outputWriter.writeTuple(appender);
+
+                // Checks the memory usage.
+                memoryUsageCheck();
+
                 tupleBuilder.reset();
                 ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
                 byte[] data = tb.getByteArray();
@@ -136,6 +144,18 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
 
             }
 
+            // Checks the memory usage.
+            private void memoryUsageCheck() throws HyracksDataException {
+                if (memoryBudget > 0) {
+                    ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
+                    byte[] data = tb.getByteArray();
+                    if (data.length > memoryBudget) {
+                        throw HyracksDataException.create(ErrorCode.GROUP_BY_MEMORY_BUDGET_EXCEEDS, data.length,
+                                memoryBudget);
+                    }
+                }
+            }
+
         };
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 8b8e320..c261df8 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -60,7 +60,7 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
     @Override
     public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
-            final IFrameWriter writer) throws HyracksDataException {
+            final IFrameWriter writer, long memoryBudget) throws HyracksDataException {
         final RunningAggregatorOutput outputWriter =
                 new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, decorFieldIdx.length, writer);
         // should enforce protocol

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index e4ac701..1e06c76 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -42,8 +42,8 @@ public class SerializableAggregatorDescriptorFactory extends AbstractAccumulatin
 
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
-                    throws HyracksDataException {
+            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults,
+            long memoryBudget) throws HyracksDataException {
         final int[] keys = keyFields;
 
         /**

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 87f3cb4..50452c1 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -43,7 +43,8 @@ public class SimpleAlgebricksAccumulatingAggregatorFactory extends AbstractAccum
 
     @Override
     public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
-            RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
+            RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys, long memoryBudget)
+            throws HyracksDataException {
 
         return new IAggregatorDescriptor() {
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index e6fbc6f..b054faf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -122,6 +122,8 @@ public class ErrorCode {
     public static final int UNRECOGNIZED_INDEX_COMPONENT_FILE = 86;
     public static final int UNEQUAL_NUM_FILTERS_TREES = 87;
     public static final int CANNOT_MODIFY_INDEX_DISK_IS_FULL = 88;
+    public static final int GROUP_BY_MEMORY_BUDGET_EXCEEDS = 89;
+    public static final int ILLEGAL_MEMORY_BUDGET = 90;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index d2e05e3..1d2143b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -105,5 +105,7 @@
 86 = Found an unrecognized index file %1$s
 87 = Unequal number of trees and filters found in %1$s
 88 = Cannot modify index (Disk is full)
+89 = The byte size of a single group (%1$s bytes) exceeds the budget for a group by operator (%2$s bytes)
+90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes)
 
 10000 = The given rule collection %1$s is not an instance of the List class.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
index 5d13332..d546e5e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
@@ -32,14 +32,14 @@ public abstract class AbstractAccumulatingAggregatorDescriptorFactory implements
      */
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, IFrameWriter writer)
-            throws HyracksDataException {
-        return this
-                .createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, IFrameWriter writer,
+            long memoryBudget) throws HyracksDataException {
+        return createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, keyFields, keyFieldsInPartialResults,
+                memoryBudget);
     }
 
     abstract protected IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
             RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields,
-            final int[] keyFieldsInPartialResults) throws HyracksDataException;
+            final int[] keyFieldsInPartialResults, long memoryBudget) throws HyracksDataException;
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index cd79a30..43b9685 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -97,7 +97,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
                 hashFunctionFamilies).createPartitioner(seed);
 
         final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
-                outRecordDescriptor, keyFields, intermediateResultKeys, null);
+                outRecordDescriptor, keyFields, intermediateResultKeys, null, -1);
 
         final AggregateState aggregateState = aggregator.createAggregateStates();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
index 99c787c..d5ed713 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
@@ -29,6 +29,6 @@ public interface IAggregatorDescriptorFactory extends Serializable {
 
     IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults,
-            IFrameWriter writer) throws HyracksDataException;
+            IFrameWriter writer, long memoryBudget) throws HyracksDataException;
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index e326f39..595e2c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -28,7 +28,6 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.AggregateState;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
 import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
@@ -58,8 +57,8 @@ public class MultiFieldsAggregatorFactory extends AbstractAccumulatingAggregator
      */
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults)
-            throws HyracksDataException {
+            RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults,
+            long memoryBudget) throws HyracksDataException {
 
         final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
         for (int i = 0; i < aggregators.length; i++) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
index 0fe0f54..739de74 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
@@ -35,22 +35,30 @@ public class PreclusteredGroupOperatorDescriptor extends AbstractSingleActivityO
     private final IBinaryComparatorFactory[] comparatorFactories;
     private final IAggregatorDescriptorFactory aggregatorFactory;
     private final boolean groupAll;
+    private final int framesLimit;
 
     public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
             IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
             RecordDescriptor recordDescriptor) {
-        this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor, false);
+        this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor, false, -1);
     }
 
     public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
             IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
-            RecordDescriptor recordDescriptor, boolean groupAll) {
+            RecordDescriptor recordDescriptor, int framesLimit) {
+        this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor, false, framesLimit);
+    }
+
+    public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
+            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor recordDescriptor, boolean groupAll, int framesLimit) {
         super(spec, 1, 1);
         this.groupFields = groupFields;
         this.comparatorFactories = comparatorFactories;
         this.aggregatorFactory = aggregatorFactory;
         outRecDescs[0] = recordDescriptor;
         this.groupAll = groupAll;
+        this.framesLimit = framesLimit;
     }
 
     @Override
@@ -58,6 +66,6 @@ public class PreclusteredGroupOperatorDescriptor extends AbstractSingleActivityO
             final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
             throws HyracksDataException {
         return new PreclusteredGroupOperatorNodePushable(ctx, groupFields, comparatorFactories, aggregatorFactory,
-                recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), outRecDescs[0], groupAll);
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), outRecDescs[0], groupAll, framesLimit);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 2acc4db..0a7444d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -36,12 +36,14 @@ class PreclusteredGroupOperatorNodePushable extends AbstractUnaryInputUnaryOutpu
     private final RecordDescriptor inRecordDescriptor;
     private final RecordDescriptor outRecordDescriptor;
     private final boolean groupAll;
+    private final int frameLimit;
 
     private PreclusteredGroupWriter pgw;
 
     PreclusteredGroupOperatorNodePushable(IHyracksTaskContext ctx, int[] groupFields,
             IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
-            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, boolean groupAll) {
+            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, boolean groupAll,
+            int frameLimit) {
         this.ctx = ctx;
         this.groupFields = groupFields;
         this.comparatorFactories = comparatorFactories;
@@ -49,6 +51,7 @@ class PreclusteredGroupOperatorNodePushable extends AbstractUnaryInputUnaryOutpu
         this.inRecordDescriptor = inRecordDescriptor;
         this.outRecordDescriptor = outRecordDescriptor;
         this.groupAll = groupAll;
+        this.frameLimit = frameLimit;
     }
 
     @Override
@@ -58,7 +61,7 @@ class PreclusteredGroupOperatorNodePushable extends AbstractUnaryInputUnaryOutpu
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
         pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDescriptor,
-                outRecordDescriptor, writer, false, groupAll);
+                outRecordDescriptor, writer, false, groupAll, frameLimit);
         pgw.open();
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6ea45c88/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 3105d42..db6102e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -26,6 +26,7 @@ import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -51,28 +52,39 @@ public class PreclusteredGroupWriter implements IFrameWriter {
     private final boolean outputPartial;
     private boolean first;
     private boolean isFailed = false;
+    private final long memoryLimit;
 
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
             IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
-        this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer, false, false);
+        this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer, false, false, -1);
     }
 
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
             IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial) throws HyracksDataException {
         this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer, outputPartial,
-                false);
+                false, -1);
     }
 
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
             IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
-            RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial, boolean groupAll)
-            throws HyracksDataException {
+            RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial, boolean groupAll,
+            int framesLimit) throws HyracksDataException {
         this.groupFields = groupFields;
         this.comparators = comparators;
+
+        if (framesLimit >= 0 && framesLimit <= 2) {
+            throw HyracksDataException.create(ErrorCode.ILLEGAL_MEMORY_BUDGET, "GROUP BY",
+                    Long.toString(((long) (framesLimit)) * ctx.getInitialFrameSize()),
+                    Long.toString(2L * ctx.getInitialFrameSize()));
+        }
+
+        // Deducts input/output frames.
+        this.memoryLimit = framesLimit <= 0 ? -1 : ((long) (framesLimit - 2)) * ctx.getInitialFrameSize();
         this.aggregator =
-                aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields, writer);
+                aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields, writer,
+                        this.memoryLimit);
         this.aggregateState = aggregator.createAggregateStates();
         copyFrame = new VSizeFrame(ctx);
         inFrameAccessor = new FrameTupleAccessor(inRecordDesc);


Mime
View raw message