asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [22/24] incubator-asterixdb git commit: Move to non-copy-based evaluator interfaces for all function implementations, including: - scalar functions, - aggregate functions, - running aggregate functions, - unnesting functions
Date Sat, 13 Feb 2016 02:15:51 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
index 88585fb..b7507b1 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
@@ -25,14 +25,17 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
-public class SerializableGlobalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+public class SerializableGlobalSqlAvgAggregateDescriptor
+        extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableGlobalSqlAvgAggregateDescriptor();
         }
@@ -44,14 +47,15 @@ public class SerializableGlobalSqlAvgAggregateDescriptor extends AbstractSeriali
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        return new ICopySerializableAggregateFunctionFactory() {
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+        return new ISerializedAggregateEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableGlobalSqlAvgAggregateFunction(args);
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws AlgebricksException {
+                return new SerializableGlobalSqlAvgAggregateFunction(args, ctx);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
index d2cc167..ebd8e89 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
@@ -22,13 +22,15 @@ package org.apache.asterix.runtime.aggregates.serializable.std;
 import java.io.DataOutput;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableGlobalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
 
-    public SerializableGlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        super(args);
+    public SerializableGlobalSqlAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+            throws AlgebricksException {
+        super(args, context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateDescriptor.java
index 1956c7d..e9920e2 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateDescriptor.java
@@ -25,15 +25,17 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
-public class SerializableIntermediateAvgAggregateDescriptor extends
-        AbstractSerializableAggregateFunctionDynamicDescriptor {
+public class SerializableIntermediateAvgAggregateDescriptor
+        extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableIntermediateAvgAggregateDescriptor();
         }
@@ -45,13 +47,15 @@ public class SerializableIntermediateAvgAggregateDescriptor extends
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        return new ICopySerializableAggregateFunctionFactory() {
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+        return new ISerializedAggregateEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
-            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableIntermediateAvgAggregateFunction(args);
+            @Override
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws AlgebricksException {
+                return new SerializableIntermediateAvgAggregateFunction(args, ctx);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateFunction.java
index af21c9f..3ea3d76 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateFunction.java
@@ -24,13 +24,15 @@ import java.io.DataOutput;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableIntermediateAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
 
-    public SerializableIntermediateAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        super(args);
+    public SerializableIntermediateAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+            throws AlgebricksException {
+        super(args, context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateDescriptor.java
index 91f6f4e..aa4d699 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateDescriptor.java
@@ -25,14 +25,17 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
-public class SerializableIntermediateSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+public class SerializableIntermediateSqlAvgAggregateDescriptor
+        extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableIntermediateSqlAvgAggregateDescriptor();
         }
@@ -44,14 +47,15 @@ public class SerializableIntermediateSqlAvgAggregateDescriptor extends AbstractS
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        return new ICopySerializableAggregateFunctionFactory() {
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+        return new ISerializedAggregateEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableIntermediateSqlAvgAggregateFunction(args);
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws AlgebricksException {
+                return new SerializableIntermediateSqlAvgAggregateFunction(args, ctx);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateFunction.java
index 920dae2..bd58214 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateFunction.java
@@ -22,13 +22,15 @@ package org.apache.asterix.runtime.aggregates.serializable.std;
 import java.io.DataOutput;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableIntermediateSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
 
-    public SerializableIntermediateSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        super(args);
+    public SerializableIntermediateSqlAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+            throws AlgebricksException {
+        super(args, context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index a29c37a..8fd080a 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -25,14 +25,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class SerializableLocalAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableLocalAvgAggregateDescriptor();
         }
@@ -44,13 +46,15 @@ public class SerializableLocalAvgAggregateDescriptor extends AbstractSerializabl
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        return new ICopySerializableAggregateFunctionFactory() {
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+        return new ISerializedAggregateEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
-            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableLocalAvgAggregateFunction(args);
+            @Override
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws AlgebricksException {
+                return new SerializableLocalAvgAggregateFunction(args, ctx);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
index 1b7772f..c15a937 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
@@ -24,13 +24,15 @@ import java.io.DataOutput;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableLocalAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
 
-    public SerializableLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        super(args);
+    public SerializableLocalAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+            throws AlgebricksException {
+        super(args, context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
index b31e69f..f96a053 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
@@ -25,14 +25,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class SerializableLocalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableLocalSqlAvgAggregateDescriptor();
         }
@@ -44,13 +46,15 @@ public class SerializableLocalSqlAvgAggregateDescriptor extends AbstractSerializ
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        return new ICopySerializableAggregateFunctionFactory() {
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+        return new ISerializedAggregateEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
-            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableLocalSqlAvgAggregateFunction(args);
+            @Override
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws AlgebricksException {
+                return new SerializableLocalSqlAvgAggregateFunction(args, ctx);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
index e2f40e5..3609050 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
@@ -22,13 +22,15 @@ package org.apache.asterix.runtime.aggregates.serializable.std;
 import java.io.DataOutput;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableLocalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
 
-    public SerializableLocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        super(args);
+    public SerializableLocalSqlAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+            throws AlgebricksException {
+        super(args, context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
index 3b8bf0a..4392e6e 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
@@ -24,14 +24,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableLocalSqlSumAggregateDescriptor();
         }
@@ -43,14 +45,15 @@ public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializ
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        return new ICopySerializableAggregateFunctionFactory() {
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+        return new ISerializedAggregateEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableSqlSumAggregateFunction(args, true);
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws AlgebricksException {
+                return new SerializableSqlSumAggregateFunction(args, true, ctx);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
index b2fab9c..e610077 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
@@ -24,14 +24,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableLocalSumAggregateDescriptor();
         }
@@ -43,14 +45,15 @@ public class SerializableLocalSumAggregateDescriptor extends AbstractSerializabl
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        return new ICopySerializableAggregateFunctionFactory() {
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+        return new ISerializedAggregateEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableSumAggregateFunction(args, true);
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws AlgebricksException {
+                return new SerializableSumAggregateFunction(args, true, ctx);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
index 55151dc..ef1914a 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
@@ -24,14 +24,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class SerializableSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableSqlAvgAggregateDescriptor();
         }
@@ -43,13 +45,15 @@ public class SerializableSqlAvgAggregateDescriptor extends AbstractSerializableA
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        return new ICopySerializableAggregateFunctionFactory() {
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+        return new ISerializedAggregateEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
-            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableSqlAvgAggregateFunction(args);
+            @Override
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws AlgebricksException {
+                return new SerializableSqlAvgAggregateFunction(args, ctx);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
index e7205ac..8b62efb 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
@@ -21,13 +21,15 @@ package org.apache.asterix.runtime.aggregates.serializable.std;
 import java.io.DataOutput;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
 
-    public SerializableSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        super(args);
+    public SerializableSqlAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+            throws AlgebricksException {
+        super(args, context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
index 1998e18..2a5bc53 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
@@ -24,9 +24,10 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * count(NULL) returns NULL.
@@ -35,6 +36,7 @@ public class SerializableSqlCountAggregateDescriptor extends AbstractSerializabl
 
     private static final long serialVersionUID = 1L;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableSqlCountAggregateDescriptor();
         }
@@ -46,14 +48,15 @@ public class SerializableSqlCountAggregateDescriptor extends AbstractSerializabl
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        return new ICopySerializableAggregateFunctionFactory() {
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+        return new ISerializedAggregateEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableSqlCountAggregateFunction(args);
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws AlgebricksException {
+                return new SerializableSqlCountAggregateFunction(args, ctx);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
index aaf9df7..24cd674 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
@@ -19,14 +19,16 @@
 package org.apache.asterix.runtime.aggregates.serializable.std;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * count(NULL) returns NULL.
  */
 public class SerializableSqlCountAggregateFunction extends AbstractSerializableCountAggregateFunction {
-    public SerializableSqlCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        super(args);
+    public SerializableSqlCountAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+            throws AlgebricksException {
+        super(args, context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
index 9813ad7..dec2688 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
@@ -24,14 +24,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableSqlSumAggregateDescriptor();
         }
@@ -43,14 +45,15 @@ public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableA
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        return new ICopySerializableAggregateFunctionFactory() {
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+        return new ISerializedAggregateEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableSqlSumAggregateFunction(args, false);
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws AlgebricksException {
+                return new SerializableSqlSumAggregateFunction(args, false, ctx);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
index 8d82dde..f34d1be 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
@@ -26,14 +26,15 @@ import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class SerializableSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
     private final boolean isLocalAgg;
 
-    public SerializableSqlSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
-            throws AlgebricksException {
-        super(args);
+    public SerializableSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, boolean isLocalAgg,
+            IHyracksTaskContext context) throws AlgebricksException {
+        super(args, context);
         this.isLocalAgg = isLocalAgg;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
index 74717ed..8a7cdef 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
@@ -24,14 +24,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class SerializableSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableSumAggregateDescriptor();
         }
@@ -43,14 +45,15 @@ public class SerializableSumAggregateDescriptor extends AbstractSerializableAggr
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        return new ICopySerializableAggregateFunctionFactory() {
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+        return new ISerializedAggregateEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableSumAggregateFunction(args, false);
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws AlgebricksException {
+                return new SerializableSumAggregateFunction(args, false, ctx);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index ef0dc3b..e5190ae 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -27,14 +27,15 @@ import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class SerializableSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
     private final boolean isLocalAgg;
 
-    public SerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
-            throws AlgebricksException {
-        super(args);
+    public SerializableSumAggregateFunction(IScalarEvaluatorFactory[] args, boolean isLocalAgg,
+            IHyracksTaskContext context) throws AlgebricksException {
+        super(args, context);
         this.isLocalAgg = isLocalAgg;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
index 37d4b05..a57aacd 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
@@ -46,37 +46,39 @@ import org.apache.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
 import org.apache.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunction {
+public abstract class AbstractAvgAggregateFunction implements IAggregateEvaluator {
     private static final int SUM_FIELD_ID = 0;
     private static final int COUNT_FIELD_ID = 1;
 
     private final ARecordType recType;
 
-    private DataOutput out;
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ICopyEvaluator eval;
+    private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+    private IPointable inputVal = new VoidPointable();
+    private IScalarEvaluator eval;
     protected ATypeTag aggType;
     private double sum;
     private long count;
     private AMutableDouble aDouble = new AMutableDouble(0);
     private AMutableInt64 aInt64 = new AMutableInt64(0);
 
-    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+    private IPointable avgBytes = new VoidPointable();
     private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
     private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
     private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
     private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+    private IScalarEvaluator evalSum = new AccessibleByteArrayEval(sumBytes);
+    private IScalarEvaluator evalCount = new AccessibleByteArrayEval(countBytes);
     private ClosedRecordConstructorEval recordEval;
 
     @SuppressWarnings("unchecked")
@@ -89,15 +91,13 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
     private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.ANULL);
 
-    public AbstractAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+    public AbstractAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
             throws AlgebricksException {
-        eval = args[0].createEvaluator(inputVal);
-        out = output.getDataOutput();
+        eval = args[0].createScalarEvaluator(context);
 
         recType = new ARecordType(null, new String[] { "sum", "count" },
                 new IAType[] { BuiltinType.ADOUBLE, BuiltinType.AINT64 }, false);
-        recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount }, avgBytes,
-                out);
+        recordEval = new ClosedRecordConstructorEval(recType, new IScalarEvaluator[] { evalSum, evalCount });
     }
 
     @Override
@@ -111,10 +111,10 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
     public abstract void step(IFrameTupleReference tuple) throws AlgebricksException;
 
     @Override
-    public abstract void finish() throws AlgebricksException;
+    public abstract void finish(IPointable result) throws AlgebricksException;
 
     @Override
-    public abstract void finishPartial() throws AlgebricksException;
+    public abstract void finishPartial(IPointable result) throws AlgebricksException;
 
     protected abstract void processNull();
 
@@ -122,9 +122,11 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
         if (skipStep()) {
             return;
         }
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        eval.evaluate(tuple, inputVal);
+        byte[] data = inputVal.getByteArray();
+        int offset = inputVal.getStartOffset();
+
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[offset]);
         if (typeTag == ATypeTag.NULL) {
             processNull();
             return;
@@ -139,32 +141,32 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
         ++count;
         switch (typeTag) {
             case INT8: {
-                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                byte val = AInt8SerializerDeserializer.getByte(data, offset + 1);
                 sum += val;
                 break;
             }
             case INT16: {
-                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                short val = AInt16SerializerDeserializer.getShort(data, offset + 1);
                 sum += val;
                 break;
             }
             case INT32: {
-                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                int val = AInt32SerializerDeserializer.getInt(data, offset + 1);
                 sum += val;
                 break;
             }
             case INT64: {
-                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                long val = AInt64SerializerDeserializer.getLong(data, offset + 1);
                 sum += val;
                 break;
             }
             case FLOAT: {
-                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                float val = AFloatSerializerDeserializer.getFloat(data, offset + 1);
                 sum += val;
                 break;
             }
             case DOUBLE: {
-                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                double val = ADoubleSerializerDeserializer.getDouble(data, offset + 1);
                 sum += val;
                 break;
             }
@@ -172,19 +174,21 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
                 throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
             }
         }
-        inputVal.reset();
     }
 
-    protected void finishPartialResults() throws AlgebricksException {
+    protected void finishPartialResults(IPointable result) throws AlgebricksException {
+        resultStorage.reset();
         try {
             // Double check that count 0 is accounted
             if (aggType == ATypeTag.SYSTEM_NULL) {
                 if (GlobalConfig.DEBUG) {
                     GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
                 }
-                out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+                resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+                result.set(resultStorage);
             } else if (aggType == ATypeTag.NULL) {
-                out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+                resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+                result.set(resultStorage);
             } else {
                 sumBytes.reset();
                 aDouble.setValue(sum);
@@ -192,7 +196,8 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
                 countBytes.reset();
                 aInt64.setValue(count);
                 longSerde.serialize(aInt64, countBytesOutput);
-                recordEval.evaluate(null);
+                recordEval.evaluate(null, avgBytes);
+                result.set(avgBytes);
             }
         } catch (IOException e) {
             throw new AlgebricksException(e);
@@ -203,10 +208,10 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
         if (skipStep()) {
             return;
         }
-        inputVal.reset();
-        eval.evaluate(tuple);
+        eval.evaluate(tuple, inputVal);
         byte[] serBytes = inputVal.getByteArray();
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+        int offset = inputVal.getStartOffset();
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[offset]);
         switch (typeTag) {
             case NULL: {
                 processNull();
@@ -220,11 +225,11 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
                 // Expected.
                 aggType = ATypeTag.DOUBLE;
                 int nullBitmapSize = 0;
-                int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, SUM_FIELD_ID, nullBitmapSize,
-                        false);
+                int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, SUM_FIELD_ID,
+                        nullBitmapSize, false);
                 sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
-                int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, COUNT_FIELD_ID, nullBitmapSize,
-                        false);
+                int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, COUNT_FIELD_ID,
+                        nullBitmapSize, false);
                 count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
                 break;
             }
@@ -235,17 +240,19 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
         }
     }
 
-    protected void finishFinalResults() throws AlgebricksException {
+    protected void finishFinalResults(IPointable result) throws AlgebricksException {
+        resultStorage.reset();
         try {
             if (count == 0 || aggType == ATypeTag.NULL) {
-                nullSerde.serialize(ANull.NULL, out);
+                nullSerde.serialize(ANull.NULL, resultStorage.getDataOutput());
             } else {
                 aDouble.setValue(sum / count);
-                doubleSerde.serialize(aDouble, out);
+                doubleSerde.serialize(aDouble, resultStorage.getDataOutput());
             }
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }
+        result.set(resultStorage);
     }
 
     protected boolean skipStep() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
index 9523d90..f93617d 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -28,31 +27,33 @@ import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 /**
  * COUNT returns the number of items in the given list. Note that COUNT(NULL) is not allowed.
  */
-public abstract class AbstractCountAggregateFunction implements ICopyAggregateFunction {
+public abstract class AbstractCountAggregateFunction implements IAggregateEvaluator {
     private AMutableInt64 result = new AMutableInt64(-1);
     @SuppressWarnings("unchecked")
     private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.AINT64);
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ICopyEvaluator eval;
+    private IPointable inputVal = new VoidPointable();
+    private IScalarEvaluator eval;
     protected long cnt;
-    private DataOutput out;
 
-    public AbstractCountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+    private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+
+    public AbstractCountAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
             throws AlgebricksException {
-        eval = args[0].createEvaluator(inputVal);
-        out = output.getDataOutput();
+        eval = args[0].createScalarEvaluator(context);
     }
 
     @Override
@@ -62,9 +63,9 @@ public abstract class AbstractCountAggregateFunction implements ICopyAggregateFu
 
     @Override
     public void step(IFrameTupleReference tuple) throws AlgebricksException {
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        eval.evaluate(tuple, inputVal);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                .deserialize(inputVal.getByteArray()[inputVal.getStartOffset()]);
         // Ignore SYSTEM_NULL.
         if (typeTag == ATypeTag.NULL) {
             processNull();
@@ -74,18 +75,20 @@ public abstract class AbstractCountAggregateFunction implements ICopyAggregateFu
     }
 
     @Override
-    public void finish() throws AlgebricksException {
+    public void finish(IPointable resultPointable) throws AlgebricksException {
+        resultStorage.reset();
         try {
             result.setValue(cnt);
-            int64Serde.serialize(result, out);
+            int64Serde.serialize(result, resultStorage.getDataOutput());
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }
+        resultPointable.set(resultStorage);
     }
 
     @Override
-    public void finishPartial() throws AlgebricksException {
-        finish();
+    public void finishPartial(IPointable resultPointable) throws AlgebricksException {
+        finish(resultPointable);
     }
 
     protected abstract void processNull();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
index e934ddb..c5a0104 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
@@ -27,46 +26,48 @@ import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
 import org.apache.asterix.om.types.hierachy.ITypeConvertComputer;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public abstract class AbstractMinMaxAggregateFunction implements ICopyAggregateFunction {
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+public abstract class AbstractMinMaxAggregateFunction implements IAggregateEvaluator {
+    private IPointable inputVal = new VoidPointable();
     private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
     private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
-    protected DataOutput out;
-    private ICopyEvaluator eval;
+
+    protected ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+    private IScalarEvaluator eval;
     protected ATypeTag aggType;
     private IBinaryComparator cmp;
     private ITypeConvertComputer tpc;
     private final boolean isMin;
 
-    public AbstractMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin)
+    public AbstractMinMaxAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, boolean isMin)
             throws AlgebricksException {
-        out = provider.getDataOutput();
-        eval = args[0].createEvaluator(inputVal);
+        eval = args[0].createScalarEvaluator(context);
         this.isMin = isMin;
     }
 
     @Override
     public void init() {
         aggType = ATypeTag.SYSTEM_NULL;
-        outputVal.reset();
         tempValForCasting.reset();
     }
 
     @Override
     public void step(IFrameTupleReference tuple) throws AlgebricksException {
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        eval.evaluate(tuple, inputVal);
+
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                .deserialize(inputVal.getByteArray()[inputVal.getStartOffset()]);
         if (typeTag == ATypeTag.NULL) {
             processNull();
             return;
@@ -110,7 +111,6 @@ public abstract class AbstractMinMaxAggregateFunction implements ICopyAggregateF
                     } catch (IOException e) {
                         throw new AlgebricksException(e);
                     }
-                    outputVal.reset();
                     outputVal.assign(tempValForCasting);
                 }
                 try {
@@ -157,19 +157,22 @@ public abstract class AbstractMinMaxAggregateFunction implements ICopyAggregateF
     }
 
     @Override
-    public void finish() throws AlgebricksException {
+    public void finish(IPointable result) throws AlgebricksException {
+        resultStorage.reset();
         try {
             switch (aggType) {
                 case NULL: {
-                    out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+                    resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+                    result.set(resultStorage);
                     break;
                 }
                 case SYSTEM_NULL: {
                     finishSystemNull();
+                    result.set(resultStorage);
                     break;
                 }
                 default: {
-                    out.write(outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength());
+                    result.set(outputVal);
                     break;
                 }
             }
@@ -179,8 +182,8 @@ public abstract class AbstractMinMaxAggregateFunction implements ICopyAggregateF
     }
 
     @Override
-    public void finishPartial() throws AlgebricksException {
-        finish();
+    public void finishPartial(IPointable result) throws AlgebricksException {
+        finish(result);
     }
 
     protected boolean skipStep() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
index fcf1850..2948887 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
@@ -41,18 +40,20 @@ import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunction {
-    protected DataOutput out;
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ICopyEvaluator eval;
+public abstract class AbstractSumAggregateFunction implements IAggregateEvaluator {
+    protected ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+    private IPointable inputVal = new VoidPointable();
+    private IScalarEvaluator eval;
     private double sum;
     protected ATypeTag aggType;
     private AMutableDouble aDouble = new AMutableDouble(0);
@@ -64,10 +65,9 @@ public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunc
     @SuppressWarnings("rawtypes")
     protected ISerializerDeserializer serde;
 
-    public AbstractSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider)
+    public AbstractSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
             throws AlgebricksException {
-        out = provider.getDataOutput();
-        eval = args[0].createEvaluator(inputVal);
+        eval = args[0].createScalarEvaluator(context);
     }
 
     @Override
@@ -81,9 +81,11 @@ public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunc
         if (skipStep()) {
             return;
         }
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        eval.evaluate(tuple, inputVal);
+        byte[] data = inputVal.getByteArray();
+        int offset = inputVal.getStartOffset();
+
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[offset]);
         if (typeTag == ATypeTag.NULL) {
             processNull();
             return;
@@ -100,32 +102,32 @@ public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunc
 
         switch (typeTag) {
             case INT8: {
-                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                byte val = AInt8SerializerDeserializer.getByte(data, offset + 1);
                 sum += val;
                 break;
             }
             case INT16: {
-                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                short val = AInt16SerializerDeserializer.getShort(data, offset + 1);
                 sum += val;
                 break;
             }
             case INT32: {
-                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                int val = AInt32SerializerDeserializer.getInt(data, offset + 1);
                 sum += val;
                 break;
             }
             case INT64: {
-                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                long val = AInt64SerializerDeserializer.getLong(data, offset + 1);
                 sum += val;
                 break;
             }
             case FLOAT: {
-                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                float val = AFloatSerializerDeserializer.getFloat(data, offset + 1);
                 sum += val;
                 break;
             }
             case DOUBLE: {
-                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                double val = ADoubleSerializerDeserializer.getDouble(data, offset + 1);
                 sum += val;
                 break;
             }
@@ -144,48 +146,49 @@ public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunc
 
     @SuppressWarnings("unchecked")
     @Override
-    public void finish() throws AlgebricksException {
+    public void finish(IPointable result) throws AlgebricksException {
+        resultStorage.reset();
         try {
             switch (aggType) {
                 case INT8: {
                     serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
                     aInt8.setValue((byte) sum);
-                    serde.serialize(aInt8, out);
+                    serde.serialize(aInt8, resultStorage.getDataOutput());
                     break;
                 }
                 case INT16: {
                     serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
                     aInt16.setValue((short) sum);
-                    serde.serialize(aInt16, out);
+                    serde.serialize(aInt16, resultStorage.getDataOutput());
                     break;
                 }
                 case INT32: {
                     serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
                     aInt32.setValue((int) sum);
-                    serde.serialize(aInt32, out);
+                    serde.serialize(aInt32, resultStorage.getDataOutput());
                     break;
                 }
                 case INT64: {
                     serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
                     aInt64.setValue((long) sum);
-                    serde.serialize(aInt64, out);
+                    serde.serialize(aInt64, resultStorage.getDataOutput());
                     break;
                 }
                 case FLOAT: {
                     serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
                     aFloat.setValue((float) sum);
-                    serde.serialize(aFloat, out);
+                    serde.serialize(aFloat, resultStorage.getDataOutput());
                     break;
                 }
                 case DOUBLE: {
                     serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
                     aDouble.setValue(sum);
-                    serde.serialize(aDouble, out);
+                    serde.serialize(aDouble, resultStorage.getDataOutput());
                     break;
                 }
                 case NULL: {
                     serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                    serde.serialize(ANull.NULL, out);
+                    serde.serialize(ANull.NULL, resultStorage.getDataOutput());
                     break;
                 }
                 case SYSTEM_NULL: {
@@ -193,23 +196,27 @@ public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunc
                     break;
                 }
                 default:
-                    throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
-                            + aggType + "). ");
+                    throw new AlgebricksException(
+                            "SumAggregationFunction: incompatible type for the result (" + aggType + "). ");
             }
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }
+        result.set(resultStorage);
     }
 
     @Override
-    public void finishPartial() throws AlgebricksException {
-        finish();
+    public void finishPartial(IPointable result) throws AlgebricksException {
+        finish(result);
     }
 
     protected boolean skipStep() {
         return false;
     }
+
     protected abstract void processNull();
+
     protected abstract void processSystemNull() throws AlgebricksException;
+
     protected abstract void finishSystemNull() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
index 0c24ef1..2b5c9bb 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
@@ -25,15 +25,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class AvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new AvgAggregateDescriptor();
         }
@@ -45,15 +46,15 @@ public class AvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDesc
     }
 
     @Override
-    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+    public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args)
             throws AlgebricksException {
-        return new ICopyAggregateFunctionFactory() {
+        return new IAggregateEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+            public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx)
                     throws AlgebricksException {
-                return new AvgAggregateFunction(args, provider);
+                return new AvgAggregateFunction(args, ctx);
             }
         };
     }



Mime
View raw message