cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sn...@apache.org
Subject cassandra git commit: Omit (de)serialization of state variable in UDAs
Date Tue, 12 Jul 2016 23:43:35 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 7751588f7 -> adffb3602


Omit (de)serialization of state variable in UDAs

patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-9613


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/adffb360
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/adffb360
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/adffb360

Branch: refs/heads/trunk
Commit: adffb3602033273efdbb8b5303c62dbf33c36903
Parents: 7751588
Author: Robert Stupp <snazy@snazy.de>
Authored: Wed Jul 13 09:43:12 2016 +1000
Committer: Robert Stupp <snazy@snazy.de>
Committed: Wed Jul 13 09:43:12 2016 +1000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/functions/JavaBasedUDFunction.java     | 49 ++++++++++--
 .../cassandra/cql3/functions/JavaUDF.java       |  2 +
 .../cql3/functions/ScriptBasedUDFunction.java   | 25 +++++-
 .../cassandra/cql3/functions/UDAggregate.java   | 52 ++++++++-----
 .../cql3/functions/UDFByteCodeVerifier.java     |  7 ++
 .../cassandra/cql3/functions/UDFunction.java    | 82 ++++++++++++++++++--
 .../cassandra/cql3/functions/JavaSourceUDF.txt  |  8 ++
 .../entities/udfverify/CallClone.java           |  5 ++
 .../entities/udfverify/CallComDatastax.java     |  5 ++
 .../entities/udfverify/CallFinalize.java        |  5 ++
 .../entities/udfverify/CallOrgApache.java       |  5 ++
 .../entities/udfverify/ClassWithField.java      |  5 ++
 .../udfverify/ClassWithInitializer.java         |  5 ++
 .../udfverify/ClassWithInitializer2.java        |  5 ++
 .../udfverify/ClassWithInitializer3.java        |  5 ++
 .../entities/udfverify/ClassWithInnerClass.java |  5 ++
 .../udfverify/ClassWithInnerClass2.java         |  5 ++
 .../udfverify/ClassWithStaticInitializer.java   |  5 ++
 .../udfverify/ClassWithStaticInnerClass.java    |  5 ++
 .../entities/udfverify/GoodClass.java           |  5 ++
 .../entities/udfverify/UseOfSynchronized.java   |  5 ++
 .../udfverify/UseOfSynchronizedWithNotify.java  |  5 ++
 .../UseOfSynchronizedWithNotifyAll.java         |  5 ++
 .../udfverify/UseOfSynchronizedWithWait.java    |  5 ++
 .../udfverify/UseOfSynchronizedWithWaitL.java   |  5 ++
 .../udfverify/UseOfSynchronizedWithWaitLI.java  |  5 ++
 .../entities/udfverify/UsingMapEntry.java       |  5 ++
 .../validation/operations/AggregationTest.java  | 59 +++++++++++++-
 29 files changed, 349 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b0a118..df07ba0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
  * Create a system table to expose prepared statements (CASSANDRA-8831)
  * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
  * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
index 87f5019..34c6cc9 100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
@@ -191,7 +191,7 @@ public final class JavaBasedUDFunction extends UDFunction
 
         // javaParamTypes is just the Java representation for argTypes resp. argDataTypes
         TypeToken<?>[] javaParamTypes = UDHelper.typeTokens(argCodecs, calledOnNullInput);
-        // javaReturnType is just the Java representation for returnType resp. returnDataType
+        // javaReturnType is just the Java representation for returnType resp. returnTypeCodec
         TypeToken<?> javaReturnType = returnCodec.getJavaType();
 
         // put each UDF in a separate package to prevent cross-UDF code access
@@ -222,7 +222,10 @@ public final class JavaBasedUDFunction extends UDFunction
                         s = body;
                         break;
                     case "arguments":
-                        s = generateArguments(javaParamTypes, argNames);
+                        s = generateArguments(javaParamTypes, argNames, false);
+                        break;
+                    case "arguments_aggregate":
+                        s = generateArguments(javaParamTypes, argNames, true);
                         break;
                     case "argument_list":
                         s = generateArgumentList(javaParamTypes, argNames);
@@ -326,7 +329,7 @@ public final class JavaBasedUDFunction extends UDFunction
                     }
                 }
 
-                if (nonSyntheticMethodCount != 2 || cls.getDeclaredConstructors().length != 1)
+                if (nonSyntheticMethodCount != 3 || cls.getDeclaredConstructors().length != 1)
                     throw new InvalidRequestException("Check your source to not define additional Java methods or constructors");
                 MethodType methodType = MethodType.methodType(void.class)
                                                   .appendParameterTypes(TypeCodec.class, TypeCodec[].class, UDFContext.class);
@@ -364,6 +367,10 @@ public final class JavaBasedUDFunction extends UDFunction
         return javaUDF.executeImpl(protocolVersion, params);
     }
 
+    protected Object executeAggregateUserDefined(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        return javaUDF.executeAggregateImpl(protocolVersion, firstParam, params);
+    }
 
     private static int countNewlines(StringBuilder javaSource)
     {
@@ -417,22 +424,48 @@ public final class JavaBasedUDFunction extends UDFunction
         return code.toString();
     }
 
-    private static String generateArguments(TypeToken<?>[] paramTypes, List<ColumnIdentifier> argNames)
+    /**
+     * Generate Java source code snippet for the arguments part to call the UDF implementation function -
+     * i.e. the {@code private #return_type# #execute_internal_name#(#argument_list#)} function
+     * (see {@code JavaSourceUDF.txt} template file for details).
+     * <p>
+     * This method generates the arguments code snippet for both {@code executeImpl} and
+     * {@code executeAggregateImpl}. General signature for both is the {@code protocolVersion} and
+     * then all UDF arguments. For aggregation UDF calls the first argument is always unserialized as
+     * that is the state variable.
+     * </p>
+     * <p>
+     * An example output for {@code executeImpl}:
+     * {@code (double) super.compose_double(protocolVersion, 0, params.get(0)), (double) super.compose_double(protocolVersion, 1, params.get(1))}
+     * </p>
+     * <p>
+     * Similar output for {@code executeAggregateImpl}:
+     * {@code firstParam, (double) super.compose_double(protocolVersion, 1, params.get(1))}
+     * </p>
+     */
+    private static String generateArguments(TypeToken<?>[] paramTypes, List<ColumnIdentifier> argNames, boolean forAggregate)
     {
         StringBuilder code = new StringBuilder(64 * paramTypes.length);
         for (int i = 0; i < paramTypes.length; i++)
         {
             if (i > 0)
+                // add separator, if not the first argument
                 code.append(",\n");
 
+            // add comment only if trace is enabled
             if (logger.isTraceEnabled())
                 code.append("            /* parameter '").append(argNames.get(i)).append("' */\n");
 
-            code
-                // cast to Java type
-                .append("            (").append(javaSourceName(paramTypes[i])).append(") ")
+            // cast to Java type
+            code.append("            (").append(javaSourceName(paramTypes[i])).append(") ");
+
+            if (forAggregate && i == 0)
+                // special case for aggregations where the state variable (1st arg to state + final function and
+                // return value from state function) is not re-serialized
+                code.append("firstParam");
+            else
                 // generate object representation of input parameter (call UDFunction.compose)
-                .append(composeMethod(paramTypes[i])).append("(protocolVersion, ").append(i).append(", params.get(").append(i).append("))");
+                code.append(composeMethod(paramTypes[i])).append("(protocolVersion, ").append(i).append(", params.get(").append(forAggregate ? i - 1 : i).append("))");
         }
         return code.toString();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java b/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java
index 7410f1f..56a7ced 100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java
@@ -45,6 +45,8 @@ public abstract class JavaUDF
 
     protected abstract ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params);
 
+    protected abstract Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params);
+
     protected Object compose(int protocolVersion, int argIndex, ByteBuffer value)
     {
         return UDFunction.compose(argCodecs, protocolVersion, argIndex, value);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
index b524163..8c15dc9 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
@@ -177,6 +177,29 @@ final class ScriptBasedUDFunction extends UDFunction
         for (int i = 0; i < params.length; i++)
             params[i] = compose(protocolVersion, i, parameters.get(i));
 
+        Object result = executeScriptInternal(params);
+
+        return decompose(protocolVersion, result);
+    }
+
+    /**
+     * Like {@link #executeUserDefined(int, List)} but the first parameter is already in non-serialized form.
+     * Remaining parameters (2nd paramters and all others) are in {@code parameters}.
+     * This is used to prevent superfluous (de)serialization of the state of aggregates.
+     * Means: scalar functions of aggregates are called using this variant.
+     */
+    protected Object executeAggregateUserDefined(int protocolVersion, Object firstParam, List<ByteBuffer> parameters)
+    {
+        Object[] params = new Object[argTypes.size()];
+        params[0] = firstParam;
+        for (int i = 1; i < params.length; i++)
+            params[i] = compose(protocolVersion, i, parameters.get(i - 1));
+
+        return executeScriptInternal(params);
+    }
+
+    private Object executeScriptInternal(Object[] params)
+    {
         ScriptContext scriptContext = new SimpleScriptContext();
         scriptContext.setAttribute("javax.script.filename", this.name.toString(), ScriptContext.ENGINE_SCOPE);
         Bindings bindings = scriptContext.getBindings(ScriptContext.ENGINE_SCOPE);
@@ -251,7 +274,7 @@ final class ScriptBasedUDFunction extends UDFunction
             }
         }
 
-        return decompose(protocolVersion, result);
+        return result;
     }
 
     private final class UDFContextWrapper extends AbstractJSObject

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
index 52b8163..6570ba8 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
@@ -24,6 +24,7 @@ import com.google.common.base.Objects;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.TypeCodec;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.schema.Functions;
@@ -36,7 +37,9 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
 {
     protected static final Logger logger = LoggerFactory.getLogger(UDAggregate.class);
 
-    protected final AbstractType<?> stateType;
+    private final AbstractType<?> stateType;
+    private final TypeCodec stateTypeCodec;
+    private final TypeCodec returnTypeCodec;
     protected final ByteBuffer initcond;
     private final ScalarFunction stateFunction;
     private final ScalarFunction finalFunction;
@@ -52,6 +55,8 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
         this.stateFunction = stateFunc;
         this.finalFunction = finalFunc;
         this.stateType = stateFunc != null ? stateFunc.returnType() : null;
+        this.stateTypeCodec = stateType != null ? UDHelper.codecFor(UDHelper.driverType(stateType)) : null;
+        this.returnTypeCodec = returnType != null ? UDHelper.codecFor(UDHelper.driverType(returnType)) : null;
         this.initcond = initcond;
     }
 
@@ -68,7 +73,7 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
         List<AbstractType<?>> stateTypes = new ArrayList<>(argTypes.size() + 1);
         stateTypes.add(stateType);
         stateTypes.addAll(argTypes);
-        List<AbstractType<?>> finalTypes = Collections.<AbstractType<?>>singletonList(stateType);
+        List<AbstractType<?>> finalTypes = Collections.singletonList(stateType);
         return new UDAggregate(name,
                                argTypes,
                                returnType,
@@ -81,7 +86,7 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
                                            List<AbstractType<?>> argTypes,
                                            AbstractType<?> returnType,
                                            ByteBuffer initcond,
-                                           final InvalidRequestException reason)
+                                           InvalidRequestException reason)
     {
         return new UDAggregate(name, argTypes, returnType, null, null, initcond)
         {
@@ -150,48 +155,55 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
             private long stateFunctionCount;
             private long stateFunctionDuration;
 
-            private ByteBuffer state;
-            {
-                reset();
-            }
+            private Object state;
+            private boolean needsInit = true;
 
             public void addInput(int protocolVersion, List<ByteBuffer> values) throws InvalidRequestException
             {
+                if (needsInit)
+                {
+                    state = initcond != null ? UDHelper.deserialize(stateTypeCodec, protocolVersion, initcond.duplicate()) : null;
+                    stateFunctionDuration = 0;
+                    stateFunctionCount = 0;
+                    needsInit = false;
+                }
+
                 long startTime = System.nanoTime();
                 stateFunctionCount++;
-                List<ByteBuffer> fArgs = new ArrayList<>(values.size() + 1);
-                fArgs.add(state);
-                fArgs.addAll(values);
                 if (stateFunction instanceof UDFunction)
                 {
                     UDFunction udf = (UDFunction)stateFunction;
-                    if (udf.isCallableWrtNullable(fArgs))
-                        state = udf.execute(protocolVersion, fArgs);
+                    if (udf.isCallableWrtNullable(values))
+                        state = udf.executeForAggregate(protocolVersion, state, values);
                 }
                 else
                 {
-                    state = stateFunction.execute(protocolVersion, fArgs);
+                    throw new UnsupportedOperationException("UDAs only support UDFs");
                 }
                 stateFunctionDuration += (System.nanoTime() - startTime) / 1000;
             }
 
             public ByteBuffer compute(int protocolVersion) throws InvalidRequestException
             {
+                assert !needsInit;
+
                 // final function is traced in UDFunction
                 Tracing.trace("Executed UDA {}: {} call(s) to state function {} in {}\u03bcs", name(), stateFunctionCount, stateFunction.name(), stateFunctionDuration);
                 if (finalFunction == null)
-                    return state;
+                    return UDFunction.decompose(stateTypeCodec, protocolVersion, state);
 
-                List<ByteBuffer> fArgs = Collections.singletonList(state);
-                ByteBuffer result = finalFunction.execute(protocolVersion, fArgs);
-                return result;
+                if (finalFunction instanceof UDFunction)
+                {
+                    UDFunction udf = (UDFunction)finalFunction;
+                    Object result = udf.executeForAggregate(protocolVersion, state, Collections.emptyList());
+                    return UDFunction.decompose(returnTypeCodec, protocolVersion, result);
+                }
+                throw new UnsupportedOperationException("UDAs only support UDFs");
             }
 
             public void reset()
             {
-                state = initcond != null ? initcond.duplicate() : null;
-                stateFunctionDuration = 0;
-                stateFunctionCount = 0;
+                needsInit = true;
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/java/org/apache/cassandra/cql3/functions/UDFByteCodeVerifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFByteCodeVerifier.java b/src/java/org/apache/cassandra/cql3/functions/UDFByteCodeVerifier.java
index cfaa70f..7d28fcd 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFByteCodeVerifier.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFByteCodeVerifier.java
@@ -108,6 +108,13 @@ public final class UDFByteCodeVerifier
                     // the executeImpl method - ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
                     return new ExecuteImplVisitor(errors);
                 }
+                if ("executeAggregateImpl".equals(name) && "(ILjava/lang/Object;Ljava/util/List;)Ljava/lang/Object;".equals(desc))
+                {
+                    if (Opcodes.ACC_PROTECTED != access)
+                        errors.add("executeAggregateImpl not protected");
+                    // the executeImpl method - ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
+                    return new ExecuteImplVisitor(errors);
+                }
                 if ("<clinit>".equals(name))
                 {
                     errors.add("static initializer declared");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 6e8d187..70d459f 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@ -28,6 +28,7 @@ import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -258,12 +259,22 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
                 return Executors.newSingleThreadExecutor();
             }
 
+            protected Object executeAggregateUserDefined(int protocolVersion, Object firstParam, List<ByteBuffer> parameters)
+            {
+                throw broken();
+            }
+
             public ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> parameters)
             {
-                throw new InvalidRequestException(String.format("Function '%s' exists but hasn't been loaded successfully "
-                                                                + "for the following reason: %s. Please see the server log for details",
-                                                                this,
-                                                                reason.getMessage()));
+                throw broken();
+            }
+
+            private InvalidRequestException broken()
+            {
+                return new InvalidRequestException(String.format("Function '%s' exists but hasn't been loaded successfully "
+                                                                 + "for the following reason: %s. Please see the server log for details",
+                                                                 this,
+                                                                 reason.getMessage()));
             }
         };
     }
@@ -301,6 +312,44 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
         }
     }
 
+    /**
+     * Like {@link #execute(int, List)} but the first parameter is already in non-serialized form.
+     * Remaining parameters (2nd paramters and all others) are in {@code parameters}.
+     * This is used to prevent superfluous (de)serialization of the state of aggregates.
+     * Means: scalar functions of aggregates are called using this variant.
+     */
+    public final Object executeForAggregate(int protocolVersion, Object firstParam, List<ByteBuffer> parameters)
+    {
+        assertUdfsEnabled(language);
+
+        if (!calledOnNullInput && firstParam == null || !isCallableWrtNullable(parameters))
+            return null;
+
+        long tStart = System.nanoTime();
+        parameters = makeEmptyParametersNull(parameters);
+
+        try
+        {
+            // Using async UDF execution is expensive (adds about 100us overhead per invocation on a Core-i7 MBPr).
+            Object result = DatabaseDescriptor.enableUserDefinedFunctionsThreads()
+                                ? executeAggregateAsync(protocolVersion, firstParam, parameters)
+                                : executeAggregateUserDefined(protocolVersion, firstParam, parameters);
+            Tracing.trace("Executed UDF {} in {}\u03bcs", name(), (System.nanoTime() - tStart) / 1000);
+            return result;
+        }
+        catch (InvalidRequestException e)
+        {
+            throw e;
+        }
+        catch (Throwable t)
+        {
+            logger.debug("Invocation of user-defined function '{}' failed", this, t);
+            if (t instanceof VirtualMachineError)
+                throw (VirtualMachineError) t;
+            throw FunctionExecutionException.create(this, t);
+        }
+    }
+
     public static void assertUdfsEnabled(String language)
     {
         if (!DatabaseDescriptor.enableUserDefinedFunctions())
@@ -344,10 +393,31 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
     {
         ThreadIdAndCpuTime threadIdAndCpuTime = new ThreadIdAndCpuTime();
 
-        Future<ByteBuffer> future = executor().submit(() -> {
+        return async(threadIdAndCpuTime, () -> {
             threadIdAndCpuTime.setup();
             return executeUserDefined(protocolVersion, parameters);
         });
+    }
+
+    /**
+     * Like {@link #executeAsync(int, List)} but the first parameter is already in non-serialized form.
+     * Remaining parameters (2nd paramters and all others) are in {@code parameters}.
+     * This is used to prevent superfluous (de)serialization of the state of aggregates.
+     * Means: scalar functions of aggregates are called using this variant.
+     */
+    private Object executeAggregateAsync(int protocolVersion, Object firstParam, List<ByteBuffer> parameters)
+    {
+        ThreadIdAndCpuTime threadIdAndCpuTime = new ThreadIdAndCpuTime();
+
+        return async(threadIdAndCpuTime, () -> {
+            threadIdAndCpuTime.setup();
+            return executeAggregateUserDefined(protocolVersion, firstParam, parameters);
+        });
+    }
+
+    private <T> T async(ThreadIdAndCpuTime threadIdAndCpuTime, Callable<T> callable)
+    {
+        Future<T> future = executor().submit(callable);
 
         try
         {
@@ -445,6 +515,8 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
 
     protected abstract ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> parameters);
 
+    protected abstract Object executeAggregateUserDefined(int protocolVersion, Object firstParam, List<ByteBuffer> parameters);
+
     public boolean isAggregate()
     {
         return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/resources/org/apache/cassandra/cql3/functions/JavaSourceUDF.txt
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/cql3/functions/JavaSourceUDF.txt b/src/resources/org/apache/cassandra/cql3/functions/JavaSourceUDF.txt
index d736a5a..802081f 100644
--- a/src/resources/org/apache/cassandra/cql3/functions/JavaSourceUDF.txt
+++ b/src/resources/org/apache/cassandra/cql3/functions/JavaSourceUDF.txt
@@ -25,6 +25,14 @@ public final class #class_name# extends JavaUDF
         return super.decompose(protocolVersion, result);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        #return_type# result = #execute_internal_name#(
+#arguments_aggregate#
+        );
+        return result;
+    }
+
     private #return_type# #execute_internal_name#(#argument_list#)
     {
 #body#

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallClone.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallClone.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallClone.java
index e8bae70..9efa83a 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallClone.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallClone.java
@@ -35,6 +35,11 @@ public final class CallClone extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallComDatastax.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallComDatastax.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallComDatastax.java
index 1af5b01..4555ff5 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallComDatastax.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallComDatastax.java
@@ -36,6 +36,11 @@ public final class CallComDatastax extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         DataType.cint();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallFinalize.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallFinalize.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallFinalize.java
index 5208849..b1ec15f 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallFinalize.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallFinalize.java
@@ -35,6 +35,11 @@ public final class CallFinalize extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallOrgApache.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallOrgApache.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallOrgApache.java
index 758d0d0..728e482 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallOrgApache.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallOrgApache.java
@@ -36,6 +36,11 @@ public final class CallOrgApache extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         DatabaseDescriptor.getClusterName();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithField.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithField.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithField.java
index 256c2bd..4c38b44 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithField.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithField.java
@@ -35,6 +35,11 @@ public final class ClassWithField extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer.java
index 3366314..cc2738a 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer.java
@@ -35,6 +35,11 @@ public final class ClassWithInitializer extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer2.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer2.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer2.java
index aaf3e7b..780c0e4 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer2.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer2.java
@@ -35,6 +35,11 @@ public final class ClassWithInitializer2 extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer3.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer3.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer3.java
index 4895aa0..e163ec9 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer3.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer3.java
@@ -35,6 +35,11 @@ public final class ClassWithInitializer3 extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass.java
index 2166771..3c4dc9b 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass.java
@@ -35,6 +35,11 @@ public final class ClassWithInnerClass extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass2.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass2.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass2.java
index 9c18510..b316040 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass2.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass2.java
@@ -35,6 +35,11 @@ public final class ClassWithInnerClass2 extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         // this is fine

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInitializer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInitializer.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInitializer.java
index 3c958e8..c97a94a 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInitializer.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInitializer.java
@@ -35,6 +35,11 @@ public final class ClassWithStaticInitializer extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInnerClass.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInnerClass.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInnerClass.java
index fada145..1b019cc 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInnerClass.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInnerClass.java
@@ -35,6 +35,11 @@ public final class ClassWithStaticInnerClass extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/GoodClass.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/GoodClass.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/GoodClass.java
index eb25f72..54821b9 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/GoodClass.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/GoodClass.java
@@ -35,6 +35,11 @@ public final class GoodClass extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronized.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronized.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronized.java
index bbbc823..dba846d 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronized.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronized.java
@@ -35,6 +35,11 @@ public final class UseOfSynchronized extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         synchronized (this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotify.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotify.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotify.java
index 07c70c7..63c319c 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotify.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotify.java
@@ -35,6 +35,11 @@ public final class UseOfSynchronizedWithNotify extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         synchronized (this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotifyAll.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotifyAll.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotifyAll.java
index 529c995..4d0c2a0 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotifyAll.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotifyAll.java
@@ -35,6 +35,11 @@ public final class UseOfSynchronizedWithNotifyAll extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         synchronized (this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWait.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWait.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWait.java
index 6e39813..b002086 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWait.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWait.java
@@ -35,6 +35,11 @@ public final class UseOfSynchronizedWithWait extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         synchronized (this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitL.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitL.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitL.java
index ac29211..f128fac 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitL.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitL.java
@@ -35,6 +35,11 @@ public final class UseOfSynchronizedWithWaitL extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         synchronized (this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitLI.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitLI.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitLI.java
index 3b9ce8b..d439518 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitLI.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitLI.java
@@ -35,6 +35,11 @@ public final class UseOfSynchronizedWithWaitLI extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         synchronized (this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UsingMapEntry.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UsingMapEntry.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UsingMapEntry.java
index 5091dc1..b99dbfd 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UsingMapEntry.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UsingMapEntry.java
@@ -37,6 +37,11 @@ public final class UsingMapEntry extends JavaUDF
         super(returnDataType, argDataTypes, udfContext);
     }
 
+    protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
     {
         Map<String, String> map = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
index 24a9528..506d533 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.cql3.validation.operations;
 
 import java.math.BigDecimal;
-import java.math.MathContext;
 import java.math.RoundingMode;
 import java.nio.ByteBuffer;
 import java.text.SimpleDateFormat;
@@ -40,6 +39,8 @@ import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.spi.TurboFilterList;
 import ch.qos.logback.classic.turbo.ReconfigureOnChangeFilter;
 import ch.qos.logback.classic.turbo.TurboFilter;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TupleValue;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -48,7 +49,6 @@ import org.apache.cassandra.cql3.UntypedResultSet.Row;
 import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.DynamicCompositeType;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.exceptions.FunctionExecutionException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -1883,6 +1883,7 @@ public class AggregationTest extends CQLTester
                    row(finalFunc, initCond));
     }
 
+    @Test
     public void testCustomTypeInitcond() throws Throwable
     {
         try
@@ -1966,4 +1967,58 @@ public class AggregationTest extends CQLTester
         assertRows(execute("select avg(val) from %s where bucket in (1, 2, 3);"),
                    row(a));
     }
+
+    @Test
+    public void testSameStateInstance() throws Throwable
+    {
+        // CASSANDRA-9613 removes the neccessity to re-serialize the state variable for each
+        // UDA state function and final function call.
+        //
+        // To test that the same state object instance is used during each invocation of the
+        // state and final function, this test uses a trick:
+        // it puts the identity hash code of the state variable to a tuple. The test then
+        // just asserts that the identity hash code is the same for all invocations
+        // of the state function and the final function.
+
+        String sf = createFunction(KEYSPACE,
+                                  "tuple<int,int,int,int>, int",
+                                  "CREATE FUNCTION %s(s tuple<int,int,int,int>, i int) " +
+                                  "CALLED ON NULL INPUT " +
+                                  "RETURNS tuple<int,int,int,int> " +
+                                  "LANGUAGE java " +
+                                  "AS 's.setInt(i, System.identityHashCode(s)); return s;'");
+
+        String ff = createFunction(KEYSPACE,
+                                  "tuple<int,int,int,int>",
+                                  "CREATE FUNCTION %s(s tuple<int,int,int,int>) " +
+                                  "CALLED ON NULL INPUT " +
+                                  "RETURNS tuple<int,int,int,int> " +
+                                  "LANGUAGE java " +
+                                  "AS 's.setInt(3, System.identityHashCode(s)); return s;'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(sf) + ' ' +
+                                   "STYPE tuple<int,int,int,int> " +
+                                   "FINALFUNC " + shortFunctionName(ff) + ' ' +
+                                   "INITCOND (0,1,2)");
+
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (0, 0)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        try (Session s = sessionNet())
+        {
+            com.datastax.driver.core.Row row = s.execute("SELECT " + a + "(b) FROM " + KEYSPACE + '.' + currentTable()).one();
+            TupleValue tuple = row.getTupleValue(0);
+            int h0 = tuple.getInt(0);
+            int h1 = tuple.getInt(1);
+            int h2 = tuple.getInt(2);
+            int h3 = tuple.getInt(3);
+            assertEquals(h0, h1);
+            assertEquals(h0, h2);
+            assertEquals(h0, h3);
+        }
+    }
 }


Mime
View raw message