cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject [1/2] cassandra git commit: Support for user-defined aggregate functions
Date Thu, 11 Dec 2014 17:47:14 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 857de5540 -> e2f35c767


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
new file mode 100644
index 0000000..118f89d
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.transport.Event;
+
+/**
+ * A <code>DROP AGGREGATE</code> statement parsed from a CQL query.
+ */
+public final class DropAggregateStatement extends SchemaAlteringStatement
+{
+    private FunctionName functionName;
+    private final boolean ifExists;
+    private final List<CQL3Type.Raw> argRawTypes;
+    private final boolean argsPresent;
+
+    public DropAggregateStatement(FunctionName functionName,
+                                  List<CQL3Type.Raw> argRawTypes,
+                                  boolean argsPresent,
+                                  boolean ifExists)
+    {
+        this.functionName = functionName;
+        this.argRawTypes = argRawTypes;
+        this.argsPresent = argsPresent;
+        this.ifExists = ifExists;
+    }
+
+    public void prepareKeyspace(ClientState state) throws InvalidRequestException
+    {
+        if (!functionName.hasKeyspace() && state.getRawKeyspace() != null)
+            functionName = new FunctionName(state.getKeyspace(), functionName.name);
+
+        if (!functionName.hasKeyspace())
+            throw new InvalidRequestException("Functions must be fully qualified with a keyspace name if a keyspace is not set for the session");
+
+        ThriftValidation.validateKeyspaceNotSystem(functionName.keyspace);
+    }
+
+    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+    {
+        // TODO CASSANDRA-7557 (function DDL permission)
+
+        state.hasKeyspaceAccess(functionName.keyspace, Permission.DROP);
+    }
+
+    public void validate(ClientState state) throws RequestValidationException
+    {
+    }
+
+    public Event.SchemaChange changeEvent()
+    {
+        return null;
+    }
+
+    public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
+    {
+        List<Function> olds = Functions.find(functionName);
+
+        if (!argsPresent && olds != null && olds.size() > 1)
+            throw new InvalidRequestException(String.format("'DROP AGGREGATE %s' matches multiple function definitions; " +
+                                                            "specify the argument types by issuing a statement like " +
+                                                            "'DROP AGGREGATE %s (type, type, ...)'. Hint: use cqlsh " +
+                                                            "'DESCRIBE AGGREGATE %s' command to find all overloads",
+                                                            functionName, functionName, functionName));
+
+        List<AbstractType<?>> argTypes = new ArrayList<>(argRawTypes.size());
+        for (CQL3Type.Raw rawType : argRawTypes)
+            argTypes.add(rawType.prepare(functionName.keyspace).getType());
+
+        Function old;
+        if (argsPresent)
+        {
+            old = Functions.find(functionName, argTypes);
+            if (old == null || !(old instanceof AggregateFunction))
+            {
+                if (ifExists)
+                    return false;
+                // just build a nicer error message
+                StringBuilder sb = new StringBuilder();
+                for (CQL3Type.Raw rawType : argRawTypes)
+                {
+                    if (sb.length() > 0)
+                        sb.append(", ");
+                    sb.append(rawType);
+                }
+                throw new InvalidRequestException(String.format("Cannot drop non existing aggregate '%s(%s)'",
+                                                                functionName, sb));
+            }
+        }
+        else
+        {
+            if (olds == null || olds.isEmpty() || !(olds.get(0) instanceof AggregateFunction))
+            {
+                if (ifExists)
+                    return false;
+                throw new InvalidRequestException(String.format("Cannot drop non existing aggregate '%s'", functionName));
+            }
+            old = olds.get(0);
+        }
+
+        if (old.isNative())
+            throw new InvalidRequestException(String.format("Cannot drop aggregate '%s' because it is a " +
+                                                            "native (built-in) function", functionName));
+
+        MigrationManager.announceAggregateDrop((UDAggregate)old, isLocalOnly);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
index 0ba3721..394aca0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
@@ -60,7 +60,7 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
             functionName = new FunctionName(state.getKeyspace(), functionName.name);
 
         if (!functionName.hasKeyspace())
-            throw new InvalidRequestException("You need to be logged in a keyspace or use a fully qualified function name");
+            throw new InvalidRequestException("Functions must be fully qualified with a keyspace name if a keyspace is not set for the session");
 
         ThriftValidation.validateKeyspaceNotSystem(functionName.keyspace);
     }
@@ -73,11 +73,6 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
         state.hasKeyspaceAccess(functionName.keyspace, Permission.DROP);
     }
 
-    /**
-     * The <code>CqlParser</code> only goes as far as extracting the keyword arguments
-     * from these statements, so this method is responsible for processing and
-     * validating.
-     */
     @Override
     public void validate(ClientState state)
     {
@@ -109,7 +104,7 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
         if (argsPresent)
         {
             old = Functions.find(functionName, argTypes);
-            if (old == null)
+            if (old == null || !(old instanceof ScalarFunction))
             {
                 if (ifExists)
                     return false;
@@ -127,7 +122,7 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
         }
         else
         {
-            if (olds == null || olds.isEmpty())
+            if (olds == null || olds.isEmpty() || !(olds.get(0) instanceof ScalarFunction))
             {
                 if (ifExists)
                     return false;
@@ -136,7 +131,11 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
             old = olds.get(0);
         }
 
-        MigrationManager.announceFunctionDrop((UDFunction)old, isLocalOnly);
+        List<Function> references = Functions.getReferencesTo(old);
+        if (!references.isEmpty())
+            throw new InvalidRequestException(String.format("Function '%s' still referenced by %s", functionName, references));
+
+        MigrationManager.announceFunctionDrop((UDFunction) old, isLocalOnly);
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index bcb0893..82a5dd1 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.config.UTMetaData;
 import org.apache.cassandra.cql3.functions.Functions;
+import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
@@ -118,7 +119,8 @@ public class DefsTables
         Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, keyspaces);
         Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, keyspaces);
         Map<DecoratedKey, ColumnFamily> oldTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_TABLE, keyspaces);
-        Map<DecoratedKey, ColumnFamily> oldFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE);
+        Map<DecoratedKey, ColumnFamily> oldFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, keyspaces);
+        Map<DecoratedKey, ColumnFamily> oldAggregates = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_AGGREGATES_TABLE, keyspaces);
 
         for (Mutation mutation : mutations)
             mutation.apply();
@@ -130,12 +132,14 @@ public class DefsTables
         Map<DecoratedKey, ColumnFamily> newKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, keyspaces);
         Map<DecoratedKey, ColumnFamily> newColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, keyspaces);
         Map<DecoratedKey, ColumnFamily> newTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_TABLE, keyspaces);
-        Map<DecoratedKey, ColumnFamily> newFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE);
+        Map<DecoratedKey, ColumnFamily> newFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, keyspaces);
+        Map<DecoratedKey, ColumnFamily> newAggregates = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_AGGREGATES_TABLE, keyspaces);
 
         Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
         mergeColumnFamilies(oldColumnFamilies, newColumnFamilies);
         mergeTypes(oldTypes, newTypes);
         mergeFunctions(oldFunctions, newFunctions);
+        mergeAggregates(oldAggregates, newAggregates);
 
         // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
         for (String keyspaceToDrop : keyspacesToDrop)
@@ -348,6 +352,59 @@ public class DefsTables
             dropFunction(udf);
     }
 
+    // see the comments for mergeKeyspaces()
+    private static void mergeAggregates(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+    {
+        List<UDAggregate> created = new ArrayList<>();
+        List<UDAggregate> altered = new ArrayList<>();
+        List<UDAggregate> dropped = new ArrayList<>();
+
+        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
+
+        // New keyspace with functions
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+            if (entry.getValue().hasColumns())
+                created.addAll(UDAggregate.fromSchema(new Row(entry.getKey(), entry.getValue())).values());
+
+        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+        {
+            ColumnFamily pre = entry.getValue().leftValue();
+            ColumnFamily post = entry.getValue().rightValue();
+
+            if (pre.hasColumns() && post.hasColumns())
+            {
+                MapDifference<Composite, UDAggregate> delta =
+                        Maps.difference(UDAggregate.fromSchema(new Row(entry.getKey(), pre)),
+                                        UDAggregate.fromSchema(new Row(entry.getKey(), post)));
+
+                dropped.addAll(delta.entriesOnlyOnLeft().values());
+                created.addAll(delta.entriesOnlyOnRight().values());
+                Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDAggregate>, UDAggregate>()
+                {
+                    public UDAggregate apply(MapDifference.ValueDifference<UDAggregate> pair)
+                    {
+                        return pair.rightValue();
+                    }
+                }));
+            }
+            else if (pre.hasColumns())
+            {
+                dropped.addAll(UDAggregate.fromSchema(new Row(entry.getKey(), pre)).values());
+            }
+            else if (post.hasColumns())
+            {
+                created.addAll(UDAggregate.fromSchema(new Row(entry.getKey(), post)).values());
+            }
+        }
+
+        for (UDAggregate udf : created)
+            addAggregate(udf);
+        for (UDAggregate udf : altered)
+            updateAggregate(udf);
+        for (UDAggregate udf : dropped)
+            dropAggregate(udf);
+    }
+
     private static void addKeyspace(KSMetaData ksm)
     {
         assert Schema.instance.getKSMetaData(ksm.name) == null;
@@ -397,6 +454,15 @@ public class DefsTables
         MigrationManager.instance.notifyCreateFunction(udf);
     }
 
+    private static void addAggregate(UDAggregate udf)
+    {
+        logger.info("Loading {}", udf);
+
+        Functions.addFunction(udf);
+
+        MigrationManager.instance.notifyCreateAggregate(udf);
+    }
+
     private static void updateKeyspace(String ksName)
     {
         KSMetaData oldKsm = Schema.instance.getKSMetaData(ksName);
@@ -441,6 +507,15 @@ public class DefsTables
         MigrationManager.instance.notifyUpdateFunction(udf);
     }
 
+    private static void updateAggregate(UDAggregate udf)
+    {
+        logger.info("Updating {}", udf);
+
+        Functions.replaceFunction(udf);
+
+        MigrationManager.instance.notifyUpdateAggregate(udf);
+    }
+
     private static void dropKeyspace(String ksName)
     {
         KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
@@ -520,6 +595,16 @@ public class DefsTables
         MigrationManager.instance.notifyDropFunction(udf);
     }
 
+    private static void dropAggregate(UDAggregate udf)
+    {
+        logger.info("Drop {}", udf);
+
+        // TODO: this is kind of broken as this remove all overloads of the function name
+        Functions.removeFunction(udf.name(), udf.argTypes());
+
+        MigrationManager.instance.notifyDropAggregate(udf);
+    }
+
     private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)
     {
         // clone ksm but do not include the new def

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index ddf6fa0..3e8b0a2 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -76,6 +76,7 @@ public final class SystemKeyspace
     public static final String SCHEMA_TRIGGERS_TABLE = "schema_triggers";
     public static final String SCHEMA_USER_TYPES_TABLE = "schema_usertypes";
     public static final String SCHEMA_FUNCTIONS_TABLE = "schema_functions";
+    public static final String SCHEMA_AGGREGATES_TABLE = "schema_aggregates";
 
     public static final String BUILT_INDEXES_TABLE = "IndexInfo";
     public static final String HINTS_TABLE = "hints";
@@ -95,7 +96,8 @@ public final class SystemKeyspace
                       SCHEMA_COLUMNS_TABLE,
                       SCHEMA_TRIGGERS_TABLE,
                       SCHEMA_USER_TYPES_TABLE,
-                      SCHEMA_FUNCTIONS_TABLE);
+                      SCHEMA_FUNCTIONS_TABLE,
+                      SCHEMA_AGGREGATES_TABLE);
 
     private static int WEEK = (int) TimeUnit.DAYS.toSeconds(7);
 
@@ -177,7 +179,6 @@ public final class SystemKeyspace
                 + "PRIMARY KEY ((keyspace_name), type_name))")
                 .gcGraceSeconds(WEEK);
 
-
     public static final CFMetaData SchemaFunctionsTable =
         compile(SCHEMA_FUNCTIONS_TABLE, "user defined function definitions",
                 "CREATE TABLE %s ("
@@ -193,6 +194,21 @@ public final class SystemKeyspace
                 + "PRIMARY KEY ((keyspace_name), function_name, signature))")
                 .gcGraceSeconds(WEEK);
 
+    public static final CFMetaData SchemaAggregatesTable =
+        compile(SCHEMA_AGGREGATES_TABLE, "user defined aggregate definitions",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "aggregate_name text,"
+                + "signature blob,"
+                + "argument_types list<text>,"
+                + "return_type text,"
+                + "state_func text,"
+                + "state_type text,"
+                + "final_func text,"
+                + "initcond blob,"
+                + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))")
+                .gcGraceSeconds(WEEK);
+
     public static final CFMetaData BuiltIndexesTable =
         compile(BUILT_INDEXES_TABLE, "built column indexes",
                 "CREATE TABLE \"%s\" ("
@@ -331,6 +347,7 @@ public final class SystemKeyspace
                           SchemaTriggersTable,
                           SchemaUserTypesTable,
                           SchemaFunctionsTable,
+                          SchemaAggregatesTable,
                           BuiltIndexesTable,
                           HintsTable,
                           BatchlogTable,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/service/IMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java
index bc67e8a..faaffb9 100644
--- a/src/java/org/apache/cassandra/service/IMigrationListener.java
+++ b/src/java/org/apache/cassandra/service/IMigrationListener.java
@@ -23,15 +23,18 @@ public interface IMigrationListener
     public void onCreateColumnFamily(String ksName, String cfName);
     public void onCreateUserType(String ksName, String typeName);
     public void onCreateFunction(String ksName, String functionName);
+    public void onCreateAggregate(String ksName, String aggregateName);
 
     public void onUpdateKeyspace(String ksName);
     public void onUpdateColumnFamily(String ksName, String cfName);
     public void onUpdateUserType(String ksName, String typeName);
     public void onUpdateFunction(String ksName, String functionName);
+    public void onUpdateAggregate(String ksName, String aggregateName);
 
     public void onDropKeyspace(String ksName);
     public void onDropColumnFamily(String ksName, String cfName);
     public void onDropUserType(String ksName, String typeName);
     public void onDropFunction(String ksName, String functionName);
+    public void onDropAggregate(String ksName, String aggregateName);
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 73bab66..c3fe1fa 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -39,11 +39,9 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.UTMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.functions.AggregateFunction;
-import org.apache.cassandra.cql3.functions.ScalarFunction;
+import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.AlreadyExistsException;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -196,21 +194,22 @@ public class MigrationManager
             listener.onDropFunction(udf.name().keyspace, udf.name().name);
     }
 
-    private List<String> asString(List<AbstractType<?>> abstractTypes)
+    public void notifyCreateAggregate(UDAggregate udf)
     {
-        List<String> r = new ArrayList<>(abstractTypes.size());
-        for (AbstractType<?> abstractType : abstractTypes)
-            r.add(abstractType.asCQL3Type().toString());
-        return r;
+        for (IMigrationListener listener : listeners)
+            listener.onCreateAggregate(udf.name().keyspace, udf.name().name);
+    }
+
+    public void notifyUpdateAggregate(UDAggregate udf)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onUpdateAggregate(udf.name().keyspace, udf.name().name);
     }
 
-    private String udType(UDFunction udf)
+    public void notifyDropAggregate(UDAggregate udf)
     {
-        if (udf instanceof ScalarFunction)
-            return "scalar";
-        if (udf instanceof AggregateFunction)
-            return "aggregate";
-        return "";
+        for (IMigrationListener listener : listeners)
+            listener.onDropAggregate(udf.name().keyspace, udf.name().name);
     }
 
     public void notifyUpdateKeyspace(KSMetaData ksm)
@@ -395,14 +394,28 @@ public class MigrationManager
     public static void announceFunctionDrop(UDFunction udf, boolean announceLocally)
     {
         Mutation mutation = udf.toSchemaDrop(FBUtilities.timestampMicros());
-        logger.info(String.format("Drop Function overload '%s' args '%s'", udf.name(), udf.argTypes()));
+        logger.info(String.format("Drop scalar function overload '%s' args '%s'", udf.name(), udf.argTypes()));
+        announce(mutation, announceLocally);
+    }
+
+    public static void announceAggregateDrop(UDAggregate udf, boolean announceLocally)
+    {
+        Mutation mutation = udf.toSchemaDrop(FBUtilities.timestampMicros());
+        logger.info(String.format("Drop aggregate function overload '%s' args '%s'", udf.name(), udf.argTypes()));
         announce(mutation, announceLocally);
     }
 
     public static void announceNewFunction(UDFunction udf, boolean announceLocally)
     {
         Mutation mutation = udf.toSchemaUpdate(FBUtilities.timestampMicros());
-        logger.info(String.format("Create Function '%s'", udf.name()));
+        logger.info(String.format("Create scalar function '%s'", udf.name()));
+        announce(mutation, announceLocally);
+    }
+
+    public static void announceNewAggregate(UDAggregate udf, boolean announceLocally)
+    {
+        Mutation mutation = udf.toSchemaUpdate(FBUtilities.timestampMicros());
+        logger.info(String.format("Create aggregate function '%s'", udf.name()));
         announce(mutation, announceLocally);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index cc071b1..5202a94 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -414,6 +414,10 @@ public class Server implements CassandraDaemon.Server
         {
         }
 
+        public void onCreateAggregate(String ksName, String aggregateName)
+        {
+        }
+
         public void onUpdateKeyspace(String ksName)
         {
             server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
@@ -433,6 +437,10 @@ public class Server implements CassandraDaemon.Server
         {
         }
 
+        public void onUpdateAggregate(String ksName, String aggregateName)
+        {
+        }
+
         public void onDropKeyspace(String ksName)
         {
             server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
@@ -451,5 +459,9 @@ public class Server implements CassandraDaemon.Server
         public void onDropFunction(String ksName, String functionName)
         {
         }
+
+        public void onDropAggregate(String ksName, String aggregateName)
+        {
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/test/unit/org/apache/cassandra/cql3/AggregationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/AggregationTest.java
index 859fe65..940e87f 100644
--- a/test/unit/org/apache/cassandra/cql3/AggregationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/AggregationTest.java
@@ -24,8 +24,12 @@ import java.util.Date;
 import java.util.TimeZone;
 
 import org.apache.commons.lang3.time.DateUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
 public class AggregationTest extends CQLTester
 {
     @Test
@@ -94,16 +98,20 @@ public class AggregationTest extends CQLTester
     {
         createTable("CREATE TABLE %s (a int primary key, b timeuuid, c double, d double)");
 
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".copySign(magnitude double, sign double) RETURNS double LANGUAGE JAVA\n" +
-                "AS 'return Double.valueOf(Math.copySign(magnitude.doubleValue(), sign.doubleValue()));';");
+        String copySign = createFunction(KEYSPACE,
+                                         "double, double",
+                                         "CREATE OR REPLACE FUNCTION %s(magnitude double, sign double) " +
+                                         "RETURNS double " +
+                                         "LANGUAGE JAVA " +
+                                         "AS 'return Double.valueOf(Math.copySign(magnitude.doubleValue(), sign.doubleValue()));';");
 
         assertColumnNames(execute("SELECT max(a), max(unixTimestampOf(b)) FROM %s"), "system.max(a)", "system.max(system.unixtimestampof(b))");
         assertRows(execute("SELECT max(a), max(unixTimestampOf(b)) FROM %s"), row(null, null));
         assertColumnNames(execute("SELECT max(a), unixTimestampOf(max(b)) FROM %s"), "system.max(a)", "system.unixtimestampof(system.max(b))");
         assertRows(execute("SELECT max(a), unixTimestampOf(max(b)) FROM %s"), row(null, null));
 
-        assertColumnNames(execute("SELECT max(copySign(c, d)) FROM %s"), "system.max("+KEYSPACE+".copysign(c, d))");
-        assertRows(execute("SELECT max(copySign(c, d)) FROM %s"), row((Object) null));
+        assertColumnNames(execute("SELECT max(" + copySign + "(c, d)) FROM %s"), "system.max(" + copySign + "(c, d))");
+        assertRows(execute("SELECT max(" + copySign + "(c, d)) FROM %s"), row((Object) null));
 
         execute("INSERT INTO %s (a, b, c, d) VALUES (1, maxTimeuuid('2011-02-03 04:05:00+0000'), -1.2, 2.1)");
         execute("INSERT INTO %s (a, b, c, d) VALUES (2, maxTimeuuid('2011-02-03 04:06:00+0000'), 1.3, -3.4)");
@@ -117,10 +125,624 @@ public class AggregationTest extends CQLTester
         assertRows(execute("SELECT max(a), max(unixTimestampOf(b)) FROM %s"), row(3, date.getTime()));
         assertRows(execute("SELECT max(a), unixTimestampOf(max(b)) FROM %s"), row(3, date.getTime()));
 
-        assertRows(execute("SELECT copySign(max(c), min(c)) FROM %s"), row(-1.4));
-        assertRows(execute("SELECT copySign(c, d) FROM %s"), row(1.2), row(-1.3), row(1.4));
-        assertRows(execute("SELECT max(copySign(c, d)) FROM %s"), row(1.4));
-        assertInvalid("SELECT copySign(c, max(c)) FROM %s");
-        assertInvalid("SELECT copySign(max(c), c) FROM %s");
+        assertRows(execute("SELECT " + copySign + "(max(c), min(c)) FROM %s"), row(-1.4));
+        assertRows(execute("SELECT " + copySign + "(c, d) FROM %s"), row(1.2), row(-1.3), row(1.4));
+        assertRows(execute("SELECT max(" + copySign + "(c, d)) FROM %s"), row(1.4));
+        assertInvalid("SELECT " + copySign + "(c, max(c)) FROM %s");
+        assertInvalid("SELECT " + copySign + "(max(c), c) FROM %s");
+    }
+
+    @Test
+    public void testDropStatements() throws Throwable
+    {
+        String f = createFunction(KEYSPACE,
+                                  "double, double",
+                                  "CREATE OR REPLACE FUNCTION %s(state double, val double) " +
+                                  "RETURNS double " +
+                                  "LANGUAGE javascript " +
+                                  "AS '\"string\";';");
+        createFunctionOverload(f,
+                                  "double, double",
+                                  "CREATE OR REPLACE FUNCTION %s(state int, val int) " +
+                                  "RETURNS int " +
+                                  "LANGUAGE javascript " +
+                                  "AS '\"string\";';");
+
+        // DROP AGGREGATE must not succeed against a scalar
+        assertInvalid("DROP AGGREGATE " + f);
+        assertInvalid("DROP AGGREGATE " + f + "(double, double)");
+
+        String a = createAggregate(KEYSPACE,
+                                   "double",
+                                   "CREATE OR REPLACE AGGREGATE %s(double) " +
+                                   "SFUNC " + shortFunctionName(f) + " " +
+                                   "STYPE double");
+        createAggregateOverload(a,
+                                "int",
+                                "CREATE OR REPLACE AGGREGATE %s(int) " +
+                                "SFUNC " + shortFunctionName(f) + " " +
+                                "STYPE int");
+
+        // DROP FUNCTION must not succeed against an aggregate
+        assertInvalid("DROP FUNCTION " + a);
+        assertInvalid("DROP FUNCTION " + a + "(double)");
+
+        // ambigious
+        assertInvalid("DROP AGGREGATE " + a);
+        assertInvalid("DROP AGGREGATE IF EXISTS " + a);
+
+        execute("DROP AGGREGATE IF EXISTS " + KEYSPACE + ".non_existing");
+        execute("DROP AGGREGATE IF EXISTS " + a + "(int, text)");
+
+        execute("DROP AGGREGATE " + a + "(double)");
+
+        execute("DROP AGGREGATE IF EXISTS " + a + "(double)");
+    }
+
+    @Test
+    public void testDropReferenced() throws Throwable
+    {
+        String f = createFunction(KEYSPACE,
+                                  "double, double",
+                                  "CREATE OR REPLACE FUNCTION %s(state double, val double) " +
+                                  "RETURNS double " +
+                                  "LANGUAGE javascript " +
+                                  "AS '\"string\";';");
+
+        String a = createAggregate(KEYSPACE,
+                                   "double",
+                                   "CREATE OR REPLACE AGGREGATE %s(double) " +
+                                   "SFUNC " + shortFunctionName(f) + " " +
+                                   "STYPE double");
+
+        // DROP FUNCTION must not succeed because the function is still referenced by the aggregate
+        assertInvalid("DROP FUNCTION " + f);
+
+        execute("DROP AGGREGATE " + a + "(double)");
+    }
+
+    @Test
+    public void testJavaAggregateNoInit() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return a.toString();'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE int " +
+                                   "FINALFUNC " + shortFunctionName(fFinal));
+
+        // 1 + 2 + 3 = 6
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row("6"));
+
+        execute("DROP AGGREGATE " + a + "(int)");
+
+        assertInvalid("SELECT " + a + "(b) FROM %s");
+    }
+
+    @Test
+    public void testJavaAggregateNullInitcond() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return a.toString();'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE int " +
+                                   "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                                   "INITCOND null");
+
+        // 1 + 2 + 3 = 6
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row("6"));
+
+        execute("DROP AGGREGATE " + a + "(int)");
+
+        assertInvalid("SELECT " + a + "(b) FROM %s");
+    }
+
+    @Test
+    public void testJavaAggregateInvalidInitcond() throws Throwable
+    {
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return a.toString();'");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE int " +
+                      "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                      "INITCOND 'foobar'");
+    }
+
+    @Test
+    public void testJavaAggregateIncompatibleTypes() throws Throwable
+    {
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return a.toString();'");
+
+        String fState2 = createFunction(KEYSPACE,
+                                        "int, int",
+                                        "CREATE FUNCTION %s(a double, b double) " +
+                                        "RETURNS double " +
+                                        "LANGUAGE java " +
+                                        "AS 'return Double.valueOf((a!=null?a.doubleValue():0d) + b.doubleValue());'");
+
+        String fFinal2 = createFunction(KEYSPACE,
+                                        "int",
+                                        "CREATE FUNCTION %s(a double) " +
+                                        "RETURNS text " +
+                                        "LANGUAGE java " +
+                                        "AS 'return a.toString();'");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE double " +
+                      "FINALFUNC " + shortFunctionName(fFinal));
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE double " +
+                      "FINALFUNC " + shortFunctionName(fFinal));
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE int " +
+                      "FINALFUNC " + shortFunctionName(fFinal));
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE int");
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE double");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" +
+                      "SFUNC " + shortFunctionName(fState2) + " " +
+                      "STYPE double " +
+                      "FINALFUNC " + shortFunctionName(fFinal));
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE double " +
+                      "FINALFUNC " + shortFunctionName(fFinal2));
+    }
+
+    @Test
+    public void testJavaAggregateNonExistingFuncs() throws Throwable
+    {
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return a.toString();'");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" +
+                      "SFUNC " + shortFunctionName(fState) + "_not_there " +
+                      "STYPE int " +
+                      "FINALFUNC " + shortFunctionName(fFinal));
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE int " +
+                      "FINALFUNC " + shortFunctionName(fFinal) + "_not_there");
+
+        execute("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" +
+                "SFUNC " + shortFunctionName(fState) + " " +
+                "STYPE int " +
+                "FINALFUNC " + shortFunctionName(fFinal));
+        execute("DROP AGGREGATE " + KEYSPACE + ".aggrInvalid(int)");
+    }
+
+    @Test
+    public void testJavaAggregateFailingFuncs() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'throw new RuntimeException();'");
+
+        String fStateOK = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf(42);'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'throw new RuntimeException();'");
+
+        String fFinalOK = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return \"foobar\";'");
+
+        String a0 = createAggregate(KEYSPACE,
+                                    "int",
+                                    "CREATE AGGREGATE %s(int) " +
+                                    "SFUNC " + shortFunctionName(fState) + " " +
+                                    "STYPE int " +
+                                    "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                                    "INITCOND null");
+        String a1 = createAggregate(KEYSPACE,
+                                    "int",
+                                    "CREATE AGGREGATE %s(int) " +
+                                    "SFUNC " + shortFunctionName(fStateOK) + " " +
+                                    "STYPE int " +
+                                    "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                                    "INITCOND null");
+        String a2 = createAggregate(KEYSPACE,
+                                    "int",
+                                    "CREATE AGGREGATE %s(int) " +
+                                    "SFUNC " + shortFunctionName(fStateOK) + " " +
+                                    "STYPE int " +
+                                    "FINALFUNC " + shortFunctionName(fFinalOK) + " " +
+                                    "INITCOND null");
+
+        assertInvalid("SELECT " + a0 + "(b) FROM %s");
+        assertInvalid("SELECT " + a1 + "(b) FROM %s");
+        assertRows(execute("SELECT " + a2 + "(b) FROM %s"), row("foobar"));
+    }
+
+    @Test
+    public void testJavaAggregateWithoutStateOrFinal() throws Throwable
+    {
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".jSumFooNE1(int) " +
+                      "SFUNC jSumFooNEstate " +
+                      "STYPE int");
+
+        String f = createFunction(KEYSPACE,
+                                  "int, int",
+                                  "CREATE FUNCTION %s(a int, b int) " +
+                                  "RETURNS int " +
+                                  "LANGUAGE java " +
+                                  "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".jSumFooNE2(int) " +
+                      "SFUNC " + shortFunctionName(f) + " " +
+                      "STYPE int " +
+                      "FINALFUNC jSumFooNEfinal");
+
+        execute("DROP FUNCTION " + f + "(int, int)");
+    }
+
+    @Test
+    public void testJavaAggregate() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return a.toString();'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE int " +
+                                   "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                                   "INITCOND 42");
+
+        // 42 + 1 + 2 + 3 = 48
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row("48"));
+
+        execute("DROP AGGREGATE " + a + "(int)");
+
+        execute("DROP FUNCTION " + fFinal + "(int)");
+        execute("DROP FUNCTION " + fState + "(int, int)");
+
+        assertInvalid("SELECT " + a + "(b) FROM %s");
+    }
+
+    @Test
+    public void testJavaAggregateSimple() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int, int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE int");
+
+        // 1 + 2 + 3 = 6
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row(6));
+
+        execute("DROP AGGREGATE " + a + "(int)");
+
+        execute("DROP FUNCTION " + fState + "(int, int)");
+
+        assertInvalid("SELECT " + a + "(b) FROM %s");
+    }
+
+    @Test
+    public void testJavaAggregateComplex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        // build an average aggregation function using
+        // tuple<bigint,int> as state
+        // double as finaltype
+
+        String fState = createFunction(KEYSPACE,
+                                       "frozen<tuple<bigint, int>>, int",
+                                       "CREATE FUNCTION %s(a frozen<tuple<bigint, int>>, b int) " +
+                                       "RETURNS frozen<tuple<bigint, int>> " +
+                                       "LANGUAGE java " +
+                                       "AS '" +
+                                       "a.setLong(0, a.getLong(0) + b.intValue());" +
+                                       "a.setInt(1, a.getInt(1) + 1);" +
+                                       "return a;" +
+                                       "'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "frozen<tuple<bigint, int>>",
+                                       "CREATE FUNCTION %s(a frozen<tuple<bigint, int>>) " +
+                                       "RETURNS double " +
+                                       "LANGUAGE java " +
+                                       "AS '" +
+                                       "double r = a.getLong(0);" +
+                                       "r /= a.getInt(1);" +
+                                       "return Double.valueOf(r);" +
+                                       "'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE frozen<tuple<bigint, int>> "+
+                                   "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                                   "INITCOND (0, 0)");
+
+        // 1 + 2 + 3 = 6 / 3 = 2
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row(2d));
+
+    }
+
+    @Test
+    public void testJavascriptAggregate() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE javascript " +
+                                       "AS 'a + b;'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE javascript " +
+                                       "AS '\"\"+a'");
+
+        String a = createFunction(KEYSPACE,
+                                  "int",
+                                  "CREATE AGGREGATE %s(int) " +
+                                  "SFUNC " + shortFunctionName(fState) + " " +
+                                  "STYPE int " +
+                                  "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                                  "INITCOND 42");
+
+        // 42 + 1 + 2 + 3 = 48
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row("48"));
+
+        execute("DROP AGGREGATE " + a + "(int)");
+
+        execute("DROP FUNCTION " + fFinal + "(int)");
+        execute("DROP FUNCTION " + fState + "(int, int)");
+
+        assertInvalid("SELECT " + a + "(b) FROM %s");
+    }
+
+    @Test
+    public void testJavascriptAggregateSimple() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE javascript " +
+                                       "AS 'a + b;'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int, int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE int ");
+
+        // 1 + 2 + 3 = 6
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row(6));
+
+        execute("DROP AGGREGATE " + a + "(int)");
+
+        execute("DROP FUNCTION " + fState + "(int, int)");
+
+        assertInvalid("SELECT " + a + "(b) FROM %s");
+    }
+
+    @Test
+    public void testFunctionDropPreparedStatement() throws Throwable
+    {
+        String otherKS = "cqltest_foo";
+
+        execute("CREATE KEYSPACE IF NOT EXISTS " + otherKS + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+        try
+        {
+            execute("CREATE TABLE " + otherKS + ".jsdp (a int primary key, b int)");
+
+            String fState = createFunction(otherKS,
+                                           "int, int",
+                                           "CREATE FUNCTION %s(a int, b int) " +
+                                           "RETURNS int " +
+                                           "LANGUAGE javascript " +
+                                           "AS 'a + b;'");
+
+            String a = createAggregate(otherKS,
+                                       "int",
+                                       "CREATE AGGREGATE %s(int) " +
+                                       "SFUNC " + shortFunctionName(fState) + " " +
+                                       "STYPE int");
+
+            ResultMessage.Prepared prepared = QueryProcessor.prepare("SELECT " + a + "(b) FROM " + otherKS + ".jsdp", ClientState.forInternalCalls(), false);
+            Assert.assertNotNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+
+            execute("DROP AGGREGATE " + a + "(int)");
+            Assert.assertNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+
+            //
+
+            execute("CREATE AGGREGATE " + a + "(int) " +
+                    "SFUNC " + shortFunctionName(fState) + " " +
+                    "STYPE int");
+
+            prepared = QueryProcessor.prepare("SELECT " + a + "(b) FROM " + otherKS + ".jsdp", ClientState.forInternalCalls(), false);
+            Assert.assertNotNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+
+            execute("DROP KEYSPACE " + otherKS + ";");
+
+            Assert.assertNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+        }
+        finally
+        {
+            execute("DROP KEYSPACE IF EXISTS " + otherKS + ";");
+        }
+    }
+
+    @Test
+    public void testAggregatesReferencedInAggregates() throws Throwable
+    {
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE javascript " +
+                                       "AS 'a + b;'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int, int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE int ");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggInv(int) " +
+                      "SFUNC " + shortFunctionName(a) + " " +
+                      "STYPE int ");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggInv(int) " +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE int " +
+                      "FINALFUNC " + shortFunctionName(a));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 2404237..883da3a 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
@@ -243,6 +244,19 @@ public abstract class CQLTester
         return USE_PREPARED_VALUES;
     }
 
+    public static FunctionName parseFunctionName(String qualifiedName)
+    {
+        int i = qualifiedName.indexOf('.');
+        return i == -1
+               ? FunctionName.nativeFunction(qualifiedName)
+               : new FunctionName(qualifiedName.substring(0, i).trim(), qualifiedName.substring(i+1).trim());
+    }
+
+    public static String shortFunctionName(String f)
+    {
+        return parseFunctionName(f).name;
+    }
+
     private static void removeAllSSTables(String ks, String table)
     {
         // clean up data directory which are stored as data directory/keyspace/data files

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/test/unit/org/apache/cassandra/cql3/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/UFTest.java b/test/unit/org/apache/cassandra/cql3/UFTest.java
index 824719b..fa28126 100644
--- a/test/unit/org/apache/cassandra/cql3/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/UFTest.java
@@ -35,14 +35,6 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 public class UFTest extends CQLTester
 {
 
-    public static FunctionName parseFunctionName(String qualifiedName)
-    {
-        int i = qualifiedName.indexOf('.');
-        return i == -1
-               ? FunctionName.nativeFunction(qualifiedName)
-               : new FunctionName(qualifiedName.substring(0, i).trim(), qualifiedName.substring(i+1).trim());
-    }
-
     @Test
     public void testFunctionDropOnKeyspaceDrop() throws Throwable
     {


Mime
View raw message