cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject [2/2] git commit: Support pure user-defined functions
Date Fri, 08 Aug 2014 19:13:13 GMT
Support pure user-defined functions

Patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-7395


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/25411bf1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/25411bf1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/25411bf1

Branch: refs/heads/trunk
Commit: 25411bf1d15a35bf17002cf7664173357c6dc6cf
Parents: 2f25e6e
Author: Robert Stupp <snazy@snazy.de>
Authored: Fri Aug 8 14:11:01 2014 -0500
Committer: Tyler Hobbs <tyler@datastax.com>
Committed: Fri Aug 8 14:11:01 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |   2 +-
 pylib/cqlshlib/cql3handling.py                  |   2 +-
 src/java/org/apache/cassandra/auth/Auth.java    |  12 +
 .../org/apache/cassandra/config/CFMetaData.java |  14 +-
 .../org/apache/cassandra/config/KSMetaData.java |   1 +
 .../org/apache/cassandra/config/UFMetaData.java | 309 +++++++++++++++++++
 .../cassandra/cql3/AssignementTestable.java     |   4 +-
 src/java/org/apache/cassandra/cql3/Cql.g        |  73 ++++-
 .../org/apache/cassandra/cql3/TypeCast.java     |   2 +-
 .../cassandra/cql3/functions/FunctionCall.java  |  26 +-
 .../cassandra/cql3/functions/Functions.java     |   7 +-
 .../statements/CreateFunctionStatement.java     | 180 +++++++++++
 .../cql3/statements/DropFunctionStatement.java  |  94 ++++++
 .../statements/SchemaAlteringStatement.java     |   6 +-
 .../cassandra/cql3/statements/Selectable.java   |   9 +-
 .../cassandra/cql3/statements/Selection.java    |  21 +-
 .../cql3/udf/UDFFunctionOverloads.java          |  87 ++++++
 .../apache/cassandra/cql3/udf/UDFRegistry.java  | 146 +++++++++
 .../apache/cassandra/cql3/udf/UDFunction.java   | 178 +++++++++++
 .../org/apache/cassandra/db/DefsTables.java     |  83 +++++
 .../org/apache/cassandra/db/SystemKeyspace.java |  14 +-
 .../cassandra/service/CassandraDaemon.java      |   4 +
 .../cassandra/service/IMigrationListener.java   |   4 +
 .../cassandra/service/MigrationManager.java     |  42 +++
 .../org/apache/cassandra/transport/Event.java   |  40 +--
 .../org/apache/cassandra/transport/Server.java  |  12 +
 test/unit/org/apache/cassandra/cql3/UFTest.java | 186 +++++++++++
 28 files changed, 1517 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b33399b..f6285fe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Support pure user-defined functions (CASSANDRA-7395)
  * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
  * Move sstable RandomAccessReader to nio2, which allows using the
    FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index e904ca2..057bf3d 100644
--- a/build.xml
+++ b/build.xml
@@ -1461,7 +1461,7 @@
     </java>
   </target>
 
-  <target name="javadoc" depends="init" description="Create javadoc">
+  <target name="javadoc" depends="init" description="Create javadoc" unless="no-javadoc">
     <create-javadoc destdir="${javadoc.dir}">
       <filesets>
       <fileset dir="${build.src.java}" defaultexcludes="yes">

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index 72461db..d912c67 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -212,7 +212,7 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ;
 <mapLiteral> ::= "{" <term> ":" <term> ( "," <term> ":" <term> )* "}"
                ;
 
-<functionName> ::= <identifier>
+<functionName> ::= ( <identifier> ":" ":" )? <identifier>
                  ;
 
 <statementBody> ::= <useStatement>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index 7c532b0..bc23d05 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -291,6 +291,10 @@ public class Auth
         {
         }
 
+        public void onDropFunction(String namespace, String functionName)
+        {
+        }
+
         public void onCreateKeyspace(String ksName)
         {
         }
@@ -303,6 +307,10 @@ public class Auth
         {
         }
 
+        public void onCreateFunction(String namespace, String functionName)
+        {
+        }
+
         public void onUpdateKeyspace(String ksName)
         {
         }
@@ -314,5 +322,9 @@ public class Auth
         public void onUpdateUserType(String ksName, String userType)
         {
         }
+
+        public void onUpdateFunction(String namespace, String functionName)
+        {
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 5a347f7..37f586d 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -213,6 +213,19 @@ public final class CFMetaData
                                                                + "PRIMARY KEY (keyspace_name, type_name)"
                                                                + ") WITH COMMENT='Defined user types' AND gc_grace_seconds=604800");
 
+    public static final CFMetaData SchemaFunctionsCf = compile("CREATE TABLE " + SystemKeyspace.SCHEMA_FUNCTIONS_CF + " ("
+                                                               + "namespace text,"
+                                                               + "name text,"
+                                                               + "signature text,"
+                                                               + "argument_names list<text>,"
+                                                               + "argument_types list<text>,"
+                                                               + "return_type text,"
+                                                               + "deterministic boolean,"
+                                                               + "language text,"
+                                                               + "body text,"
+                                                               + "primary key ((namespace, name), signature)"
+                                                               + ") WITH COMMENT='user defined functions' AND gc_grace_seconds=604800");
+
     public static final CFMetaData HintsCf = compile("CREATE TABLE " + SystemKeyspace.HINTS_CF + " ("
                                                      + "target_id uuid,"
                                                      + "hint_id timeuuid,"
@@ -331,7 +344,6 @@ public final class CFMetaData
                                                                  + "PRIMARY KEY (id)"
                                                                  + ") WITH COMMENT='show all compaction history' AND DEFAULT_TIME_TO_LIVE=604800");
 
-
     public static class SpeculativeRetry
     {
         public enum RetryType

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index 8c99191..64ac3ff 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -101,6 +101,7 @@ public final class KSMetaData
                                                 CFMetaData.SchemaColumnsCf,
                                                 CFMetaData.SchemaTriggersCf,
                                                 CFMetaData.SchemaUserTypesCf,
+                                                CFMetaData.SchemaFunctionsCf,
                                                 CFMetaData.CompactionLogCf,
                                                 CFMetaData.CompactionHistoryCf,
                                                 CFMetaData.PaxosCf,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/config/UFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/UFMetaData.java b/src/java/org/apache/cassandra/config/UFMetaData.java
new file mode 100644
index 0000000..18484f3
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/UFMetaData.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.config;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+import org.antlr.runtime.ANTLRStringStream;
+import org.antlr.runtime.CharStream;
+import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.RecognitionException;
+import org.antlr.runtime.TokenStream;
+import org.apache.cassandra.cql3.AssignementTestable;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.CqlLexer;
+import org.apache.cassandra.cql3.CqlParser;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.udf.UDFFunctionOverloads;
+import org.apache.cassandra.cql3.udf.UDFRegistry;
+import org.apache.cassandra.db.CFRowAdder;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.SyntaxException;
+
+/**
+ * Defined (and loaded) user functions.
+ * <p/>
+ * In practice, because user functions are global, we have only one instance of
+ * this class that retrieve through the Schema class.
+ */
+public final class UFMetaData
+{
+    public final String namespace;
+    public final String functionName;
+    public final String qualifiedName;
+    public final String returnType;
+    public final List<String> argumentNames;
+    public final List<String> argumentTypes;
+    public final String language;
+    public final String body;
+    public final boolean deterministic;
+
+    public final String signature;
+    public final List<CQL3Type> cqlArgumentTypes;
+    public final CQL3Type cqlReturnType;
+
+    static final CompositeType partKey = (CompositeType) CFMetaData.SchemaFunctionsCf.getKeyValidator();
+
+    // TODO tracking "valid" status via an exception field is really bad style - but we need some way to mark a function as "dead"
+    public InvalidRequestException invalid;
+
+    public UFMetaData(String namespace, String functionName, boolean deterministic, List<String> argumentNames,
+                      List<String> argumentTypes, String returnType, String language, String body)
+    {
+        this.namespace = namespace != null ? namespace.toLowerCase() : "";
+        this.functionName = functionName.toLowerCase();
+        this.qualifiedName = qualifiedName(namespace, functionName);
+        this.returnType = returnType;
+        this.argumentNames = argumentNames;
+        this.argumentTypes = argumentTypes;
+        this.language = language == null ? "class" : language.toLowerCase();
+        this.body = body;
+        this.deterministic = deterministic;
+
+        this.cqlArgumentTypes = new ArrayList<>(argumentTypes.size());
+        InvalidRequestException inv = null;
+        CQL3Type rt = null;
+        try
+        {
+            rt = parseCQLType(returnType);
+            for (String argumentType : argumentTypes)
+                cqlArgumentTypes.add(parseCQLType(argumentType));
+        }
+        catch (InvalidRequestException e)
+        {
+            inv = e;
+        }
+        this.invalid = inv;
+        this.cqlReturnType = rt;
+
+        StringBuilder signature = new StringBuilder();
+        signature.append(qualifiedName);
+        for (String argumentType : argumentTypes)
+        {
+            signature.append(',');
+            signature.append(argumentType);
+        }
+        this.signature = signature.toString();
+    }
+
+    public boolean compatibleArgs(String ksName, String cfName, List<? extends AssignementTestable> providedArgs)
+    {
+        int cnt = cqlArgumentTypes.size();
+        if (cnt != providedArgs.size())
+            return false;
+        for (int i = 0; i < cnt; i++)
+        {
+            AssignementTestable provided = providedArgs.get(i);
+
+            if (provided == null)
+                continue;
+
+            AbstractType<?> argType = cqlArgumentTypes.get(i).getType();
+
+            ColumnSpecification expected = makeArgSpec(ksName, cfName, argType, i);
+            if (!provided.isAssignableTo(ksName, expected))
+                return false;
+        }
+
+        return true;
+    }
+
+    public ColumnSpecification makeArgSpec(String ksName, String cfName, AbstractType<?> argType, int i)
+    {
+        return new ColumnSpecification(ksName,
+                                       cfName,
+                                       new ColumnIdentifier("arg" + i + "(" + qualifiedName + ")", true), argType);
+    }
+
+    private static CQL3Type parseCQLType(String cqlType)
+    throws InvalidRequestException
+    {
+        CharStream stream = new ANTLRStringStream(cqlType);
+        CqlLexer lexer = new CqlLexer(stream);
+
+        TokenStream tokenStream = new CommonTokenStream(lexer);
+        CqlParser parser = new CqlParser(tokenStream);
+        try
+        {
+            CQL3Type.Raw rawType = parser.comparatorType();
+            // TODO CASSANDRA-7563 use appropiate keyspace here ... keyspace must be fully qualified
+            CQL3Type t = rawType.prepare(null);
+            // TODO CASSANDRA-7563 support "complex" types (UDT, tuples, collections), remove catch-NPE below
+            if (!(t instanceof CQL3Type.Native))
+                throw new InvalidRequestException("non-native CQL type '" + cqlType + "' not supported");
+            return t;
+        }
+        catch (NullPointerException | InvalidRequestException | RecognitionException e)
+        {
+            throw new InvalidRequestException("invalid CQL type '" + cqlType + "'");
+        }
+    }
+
+    public static String qualifiedName(String namespace, String functionName)
+    {
+        if (namespace == null)
+            return "::" + functionName;
+        return (namespace + "::" + functionName).toLowerCase();
+    }
+
+    public static Mutation dropFunction(long timestamp, String namespace, String functionName)
+    {
+        UDFFunctionOverloads sigMap = UDFRegistry.getFunctionSigMap(UFMetaData.qualifiedName(namespace, functionName));
+        if (sigMap == null || sigMap.isEmpty())
+            return null;
+
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, partKey.decompose(namespace, functionName));
+        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_FUNCTIONS_CF);
+
+        int ldt = (int) (System.currentTimeMillis() / 1000);
+        for (UFMetaData f : sigMap.values())
+            udfRemove(timestamp, cf, ldt, f);
+
+        return mutation;
+    }
+
+    private static Composite udfSignatureKey(UFMetaData function)
+    {
+        return CFMetaData.SchemaFunctionsCf.comparator.make(function.signature);
+    }
+
+    private static void udfRemove(long timestamp, ColumnFamily cf, int ldt, UFMetaData f)
+    {
+        Composite prefix = udfSignatureKey(f);
+        cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+    }
+
+    public static Mutation createOrReplaceFunction(long timestamp, UFMetaData f)
+    throws ConfigurationException, SyntaxException
+    {
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, partKey.decompose(f.namespace, f.functionName));
+        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_FUNCTIONS_CF);
+
+        Composite prefix = udfSignatureKey(f);
+        CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
+
+        adder.resetCollection("argument_names");
+        adder.resetCollection("argument_types");
+        adder.add("name", f.functionName);
+        adder.add("return_type", f.returnType);
+        adder.add("language", f.language);
+        adder.add("body", f.body);
+        adder.add("deterministic", f.deterministic);
+
+        for (String argName : f.argumentNames)
+            adder.addListEntry("argument_names", argName);
+        for (String argType : f.argumentTypes)
+            adder.addListEntry("argument_types", argType);
+
+        return mutation;
+    }
+
+    public static UFMetaData fromSchema(UntypedResultSet.Row row)
+    {
+        String namespace = row.getString("namespace");
+        String name = row.getString("name");
+        List<String> argumentNames = row.getList("argument_names", UTF8Type.instance);
+        List<String> argumentTypes = row.getList("argument_types", UTF8Type.instance);
+        String returnType = row.getString("return_type");
+        boolean deterministic = row.getBoolean("deterministic");
+        String language = row.getString("language");
+        String body = row.getString("body");
+
+        return new UFMetaData(namespace, name, deterministic, argumentNames, argumentTypes, returnType, language, body);
+    }
+
+    public static Map<String, UFMetaData> fromSchema(Row row)
+    {
+        UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_FUNCTIONS_CF, row);
+        Map<String, UFMetaData> udfs = new HashMap<>(results.size());
+        for (UntypedResultSet.Row result : results)
+        {
+            UFMetaData udf = fromSchema(result);
+            udfs.put(udf.signature, udf);
+        }
+        return udfs;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        UFMetaData that = (UFMetaData) o;
+        if (!signature.equals(that.signature))
+            return false;
+        if (deterministic != that.deterministic)
+            return false;
+        if (argumentNames != null ? !argumentNames.equals(that.argumentNames) : that.argumentNames != null)
+            return false;
+        if (body != null ? !body.equals(that.body) : that.body != null)
+            return false;
+        if (!namespace.equals(that.namespace))
+            return false;
+        if (!language.equals(that.language))
+            return false;
+        if (returnType != null ? !returnType.equals(that.returnType) : that.returnType != null)
+            return false;
+
+        return true;
+    }
+
+    public int hashCode()
+    {
+        int result = signature.hashCode();
+        result = 31 * result + (returnType != null ? returnType.hashCode() : 0);
+        result = 31 * result + (argumentNames != null ? argumentNames.hashCode() : 0);
+        result = 31 * result + (argumentTypes.hashCode());
+        result = 31 * result + (language.hashCode());
+        result = 31 * result + (body != null ? body.hashCode() : 0);
+        result = 31 * result + (deterministic ? 1 : 0);
+        return result;
+    }
+
+    public String toString()
+    {
+        return new ToStringBuilder(this)
+               .append("signature", signature)
+               .append("returnType", returnType)
+               .append("deterministic", deterministic)
+               .append("language", language)
+               .append("body", body)
+               .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/AssignementTestable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/AssignementTestable.java b/src/java/org/apache/cassandra/cql3/AssignementTestable.java
index 2253cf7..02b3013 100644
--- a/src/java/org/apache/cassandra/cql3/AssignementTestable.java
+++ b/src/java/org/apache/cassandra/cql3/AssignementTestable.java
@@ -17,12 +17,10 @@
  */
 package org.apache.cassandra.cql3;
 
-import org.apache.cassandra.exceptions.InvalidRequestException;
-
 public interface AssignementTestable
 {
     /**
      * @return whether this object can be assigned to the provided receiver
      */
-    public boolean isAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException;
+    public boolean isAssignableTo(String keyspace, ColumnSpecification receiver);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 268bce5..96a668b 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -242,6 +242,8 @@ cqlStatement returns [ParsedStatement stmt]
     | st25=createTypeStatement         { $stmt = st25; }
     | st26=alterTypeStatement          { $stmt = st26; }
     | st27=dropTypeStatement           { $stmt = st27; }
+    | st28=createFunctionStatement     { $stmt = st28; }
+    | st29=dropFunctionStatement       { $stmt = st29; }
     ;
 
 /*
@@ -298,7 +300,8 @@ unaliasedSelector returns [Selectable s]
     :  ( c=cident                                  { tmp = c; }
        | K_WRITETIME '(' c=cident ')'              { tmp = new Selectable.WritetimeOrTTL(c, true); }
        | K_TTL       '(' c=cident ')'              { tmp = new Selectable.WritetimeOrTTL(c, false); }
-       | f=functionName args=selectionFunctionArgs { tmp = new Selectable.WithFunction(f, args); }
+       | f=functionName args=selectionFunctionArgs { tmp = new Selectable.WithFunction("", f, args); }
+       | bn=udfName '::' fn=udfName args=selectionFunctionArgs { tmp = new Selectable.WithFunction(bn, fn, args); }
        ) ( '.' fi=cident { tmp = new Selectable.WithFieldSelection(tmp, fi); } )* { $s = tmp; }
     ;
 
@@ -485,6 +488,48 @@ batchStatementObjective returns [ModificationStatement.Parsed statement]
     | d=deleteStatement  { $statement = d; }
     ;
 
+createFunctionStatement returns [CreateFunctionStatement expr]
+    @init {
+        boolean orReplace = false;
+        boolean ifNotExists = false;
+
+        boolean deterministic = true;
+        String language = "CLASS";
+        String bodyOrClassName = null;
+        List<CreateFunctionStatement.Argument> args = new ArrayList<CreateFunctionStatement.Argument>();
+    }
+    : K_CREATE (K_OR K_REPLACE { orReplace = true; })?
+      ((K_NON { deterministic = false; })? K_DETERMINISTIC)?
+      K_FUNCTION
+      (K_IF K_NOT K_EXISTS { ifNotExists = true; })?
+      ( bn=udfName '::' )?
+      fn=udfName
+      '('
+        (
+          k=cident v=comparatorType { args.add(new CreateFunctionStatement.Argument(k, v)); }
+          ( ',' k=cident v=comparatorType { args.add(new CreateFunctionStatement.Argument(k, v)); } )*
+        )?
+      ')'
+      K_RETURNS
+      rt=comparatorType
+      (
+          (                      { language="CLASS"; } cls = STRING_LITERAL { bodyOrClassName = $cls.text; } )
+        | ( K_LANGUAGE l = IDENT { language=$l.text; } K_BODY body = ((~K_END_BODY)*) { bodyOrClassName = $body.text; } K_END_BODY )
+      )
+      { $expr = new CreateFunctionStatement(bn, fn, language, bodyOrClassName, deterministic, rt, args, orReplace, ifNotExists); }
+    ;
+
+dropFunctionStatement returns [DropFunctionStatement expr]
+    @init {
+        boolean ifExists = false;
+    }
+    : K_DROP K_FUNCTION
+      (K_IF K_EXISTS { ifExists = true; } )?
+      ( bn=udfName '::' )?
+      fn=udfName
+      { $expr = new DropFunctionStatement(bn, fn, ifExists); }
+    ;
+
 /**
  * CREATE KEYSPACE [IF NOT EXISTS] <KEYSPACE> WITH attr1 = value1 AND attr2 = value2;
  */
@@ -917,6 +962,11 @@ functionName returns [String s]
     | K_TOKEN                       { $s = "token"; }
     ;
 
+udfName returns [String s]
+    : f=IDENT                       { $s = $f.text; }
+    | u=unreserved_function_keyword { $s = u; }
+    ;
+
 functionArgs returns [List<Term.Raw> a]
     : '(' ')' { $a = Collections.emptyList(); }
     | '(' t1=term { List<Term.Raw> args = new ArrayList<Term.Raw>(); args.add(t1); }
@@ -926,7 +976,8 @@ functionArgs returns [List<Term.Raw> a]
 
 term returns [Term.Raw term]
     : v=value                          { $term = v; }
-    | f=functionName args=functionArgs { $term = new FunctionCall.Raw(f, args); }
+    | f=functionName args=functionArgs { $term = new FunctionCall.Raw("", f, args); }
+    | bn=udfName '::' fn=udfName args=functionArgs { $term = new FunctionCall.Raw(bn, fn, args); }
     | '(' c=comparatorType ')' t=term  { $term = new TypeCast(c, t); }
     ;
 
@@ -1180,10 +1231,16 @@ basic_unreserved_keyword returns [String str]
         | K_DISTINCT
         | K_CONTAINS
         | K_STATIC
+        | K_FUNCTION
+        | K_RETURNS
+        | K_LANGUAGE
+        | K_NON
+        | K_DETERMINISTIC
+        | K_BODY
+        | K_END_BODY
         ) { $str = $k.text; }
     ;
 
-
 // Case-insensitive keywords
 K_SELECT:      S E L E C T;
 K_FROM:        F R O M;
@@ -1287,6 +1344,16 @@ K_TUPLE:       T U P L E;
 K_TRIGGER:     T R I G G E R;
 K_STATIC:      S T A T I C;
 
+K_FUNCTION:    F U N C T I O N;
+K_RETURNS:     R E T U R N S;
+K_LANGUAGE:    L A N G U A G E;
+K_NON:         N O N;
+K_OR:          O R;
+K_REPLACE:     R E P L A C E;
+K_DETERMINISTIC: D E T E R M I N I S T I C;
+K_END_BODY:    E N D '_' B O D Y;
+K_BODY:        B O D Y;
+
 // Case-insensitive alpha characters
 fragment A: ('a'|'A');
 fragment B: ('b'|'B');

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/TypeCast.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/TypeCast.java b/src/java/org/apache/cassandra/cql3/TypeCast.java
index e325e4d..3250e3b 100644
--- a/src/java/org/apache/cassandra/cql3/TypeCast.java
+++ b/src/java/org/apache/cassandra/cql3/TypeCast.java
@@ -46,7 +46,7 @@ public class TypeCast implements Term.Raw
         return new ColumnSpecification(receiver.ksName, receiver.cfName, new ColumnIdentifier(toString(), true), type.prepare(keyspace).getType());
     }
 
-    public boolean isAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
+    public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index a0c7447..fe2c2ee 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.udf.UDFunction;
+import org.apache.cassandra.cql3.udf.UDFRegistry;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.ListType;
@@ -93,18 +95,33 @@ public class FunctionCall extends Term.NonTerminal
 
     public static class Raw implements Term.Raw
     {
+        private final String namespace;
         private final String functionName;
         private final List<Term.Raw> terms;
 
-        public Raw(String functionName, List<Term.Raw> terms)
+        public Raw(String namespace, String functionName, List<Term.Raw> terms)
         {
+            this.namespace = namespace;
             this.functionName = functionName;
             this.terms = terms;
         }
 
         public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            Function fun = Functions.get(keyspace, functionName, terms, receiver);
+            Function fun = null;
+            if (namespace.isEmpty())
+                fun = Functions.get(keyspace, functionName, terms, receiver);
+
+            if (fun == null)
+            {
+                UDFunction udf = UDFRegistry.resolveFunction(namespace, functionName, receiver.ksName, receiver.cfName, terms);
+                if (udf != null)
+                    // got a user defined function to call
+                    fun = udf.create(terms);
+            }
+
+            if (fun == null)
+                throw new InvalidRequestException(String.format("Unknown function %s called", namespace.isEmpty() ? functionName : namespace + "::" + functionName));
 
             List<Term> parameters = new ArrayList<Term>(terms.size());
             boolean allTerminal = true;
@@ -149,10 +166,13 @@ public class FunctionCall extends Term.NonTerminal
         public String toString()
         {
             StringBuilder sb = new StringBuilder();
+            if (!namespace.isEmpty())
+                sb.append(namespace).append("::");
             sb.append(functionName).append("(");
             for (int i = 0; i < terms.size(); i++)
             {
-                if (i > 0) sb.append(", ");
+                if (i > 0)
+                    sb.append(", ");
                 sb.append(terms.get(i));
             }
             return sb.append(")").toString();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index 605e7b3..03dd13d 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -62,6 +62,11 @@ public abstract class Functions
         declared.put("blobasvarchar", AbstractFunction.factory(BytesConversionFcts.BlobAsVarcharFact));
     }
 
+    public static boolean contains(String functionName)
+    {
+        return declared.containsKey(functionName);
+    }
+
     public static AbstractType<?> getReturnType(String functionName, String ksName, String cfName)
     {
         List<Function.Factory> factories = declared.get(functionName.toLowerCase());
@@ -82,7 +87,7 @@ public abstract class Functions
     {
         List<Function.Factory> factories = declared.get(name.toLowerCase());
         if (factories.isEmpty())
-            throw new InvalidRequestException(String.format("Unknown CQL3 function %s called", name));
+            return null;
 
         // Fast path if there is not choice
         if (factories.size() == 1)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
new file mode 100644
index 0000000..094c318
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.functions.Functions;
+import org.apache.cassandra.cql3.udf.UDFRegistry;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.Event;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+/**
+ * A <code>CREATE FUNCTION</code> statement parsed from a CQL query.
+ */
+public final class CreateFunctionStatement extends SchemaAlteringStatement
+{
+    final boolean orReplace;
+    final boolean ifNotExists;
+    final String namespace;
+    final String functionName;
+    final String qualifiedName;
+    final String language;
+    final String body;
+    final boolean deterministic;
+    final CQL3Type.Raw returnType;
+    final List<Argument> arguments;
+
+    private UFMetaData ufMeta;
+
+    public CreateFunctionStatement(String namespace, String functionName, String language, String body, boolean deterministic,
+                                   CQL3Type.Raw returnType, List<Argument> arguments, boolean orReplace, boolean ifNotExists)
+    {
+        super();
+        this.namespace = namespace != null ? namespace : "";
+        this.functionName = functionName;
+        this.qualifiedName = UFMetaData.qualifiedName(namespace, functionName);
+        this.language = language;
+        this.body = body;
+        this.deterministic = deterministic;
+        this.returnType = returnType;
+        this.arguments = arguments;
+        assert functionName != null : "null function name";
+        assert language != null : "null function language";
+        assert body != null : "null function body";
+        assert returnType != null : "null function returnType";
+        assert arguments != null : "null function arguments";
+        this.orReplace = orReplace;
+        this.ifNotExists = ifNotExists;
+    }
+
+    public void checkAccess(ClientState state) throws UnauthorizedException
+    {
+        // TODO CASSANDRA-7557 (function DDL permission)
+
+        state.hasAllKeyspacesAccess(Permission.CREATE);
+    }
+
+    /**
+     * The <code>CqlParser</code> only goes as far as extracting the keyword arguments
+     * from these statements, so this method is responsible for processing and
+     * validating.
+     *
+     * @throws org.apache.cassandra.exceptions.InvalidRequestException if arguments are missing or unacceptable
+     */
+    public void validate(ClientState state) throws RequestValidationException
+    {
+        if (!namespace.isEmpty() && !namespace.matches("\\w+"))
+            throw new InvalidRequestException(String.format("\"%s\" is not a valid function name", qualifiedName));
+        if (!functionName.matches("\\w+"))
+            throw new InvalidRequestException(String.format("\"%s\" is not a valid function name", qualifiedName));
+        if (namespace.length() > Schema.NAME_LENGTH)
+            throw new InvalidRequestException(String.format("UDF namespace names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, qualifiedName));
+        if (functionName.length() > Schema.NAME_LENGTH)
+            throw new InvalidRequestException(String.format("UDF function names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, qualifiedName));
+    }
+
+    public Event.SchemaChange changeEvent()
+    {
+        return null;
+    }
+
+    public ResultMessage executeInternal(QueryState state, QueryOptions options)
+    {
+        try
+        {
+            doExecute();
+            return super.executeInternal(state, options);
+        }
+        catch (RequestValidationException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public ResultMessage execute(QueryState state, QueryOptions options) throws RequestValidationException
+    {
+        doExecute();
+        return super.execute(state, options);
+    }
+
+    private void doExecute() throws RequestValidationException
+    {
+        boolean exists = UDFRegistry.hasFunction(qualifiedName);
+        if (exists && ifNotExists)
+            throw new InvalidRequestException(String.format("Function '%s' already exists.", qualifiedName));
+        if (exists && !orReplace)
+            throw new InvalidRequestException(String.format("Function '%s' already exists.", qualifiedName));
+
+        if (namespace.isEmpty() && Functions.contains(functionName))
+            throw new InvalidRequestException(String.format("Function name '%s' is reserved by CQL.", qualifiedName));
+
+        List<Argument> args = arguments;
+        List<String> argumentNames = new ArrayList<>(args.size());
+        List<String> argumentTypes = new ArrayList<>(args.size());
+        for (Argument arg : args)
+        {
+            argumentNames.add(arg.getName().toString());
+            argumentTypes.add(arg.getType().toString());
+        }
+        this.ufMeta = new UFMetaData(namespace, functionName, deterministic, argumentNames, argumentTypes,
+                                     returnType.toString(), language, body);
+
+        UDFRegistry.tryCreateFunction(ufMeta);
+    }
+
+    public void announceMigration(boolean isLocalOnly) throws RequestValidationException
+    {
+        MigrationManager.announceNewFunction(ufMeta, isLocalOnly);
+    }
+
+    public static final class Argument
+    {
+        final ColumnIdentifier name;
+        final CQL3Type.Raw type;
+
+        public Argument(ColumnIdentifier name, CQL3Type.Raw type)
+        {
+            this.name = name;
+            this.type = type;
+        }
+
+        public ColumnIdentifier getName()
+        {
+            return name;
+        }
+
+        public CQL3Type.Raw getType()
+        {
+            return type;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
new file mode 100644
index 0000000..7627ab4
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.transport.Event;
+
+/**
+ * A <code>DROP FUNCTION</code> statement parsed from a CQL query.
+ */
+public final class DropFunctionStatement extends SchemaAlteringStatement
+{
+    private final String namespace;
+    private final String functionName;
+    private final String qualifiedName;
+    private final boolean ifExists;
+
+    public DropFunctionStatement(String namespace, String functionName, boolean ifExists)
+    {
+        super();
+        this.namespace = namespace == null ? "" : namespace;
+        this.functionName = functionName;
+        this.qualifiedName = UFMetaData.qualifiedName(namespace, functionName);
+        this.ifExists = ifExists;
+    }
+
+    public void checkAccess(ClientState state) throws UnauthorizedException
+    {
+        // TODO CASSANDRA-7557 (function DDL permission)
+
+        state.hasAllKeyspacesAccess(Permission.DROP);
+    }
+
+    /**
+     * The <code>CqlParser</code> only goes as far as extracting the keyword arguments
+     * from these statements, so this method is responsible for processing and
+     * validating.
+     *
+     * @throws org.apache.cassandra.exceptions.InvalidRequestException if arguments are missing or unacceptable
+     */
+    public void validate(ClientState state) throws RequestValidationException
+    {
+        if (!namespace.isEmpty() && !namespace.matches("\\w+"))
+            throw new InvalidRequestException(String.format("\"%s\" is not a valid function name", qualifiedName));
+        if (!functionName.matches("\\w+"))
+            throw new InvalidRequestException(String.format("\"%s\" is not a valid function name", qualifiedName));
+        if (namespace.length() > Schema.NAME_LENGTH)
+            throw new InvalidRequestException(String.format("UDF namespace names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, qualifiedName));
+        if (functionName.length() > Schema.NAME_LENGTH)
+            throw new InvalidRequestException(String.format("UDF function names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, qualifiedName));
+    }
+
+    public Event.SchemaChange changeEvent()
+    {
+        return null;
+    }
+
+    // no execute() - drop propagated via MigrationManager
+
+    public void announceMigration(boolean isLocalOnly) throws RequestValidationException
+    {
+        try
+        {
+            MigrationManager.announceFunctionDrop(namespace, functionName, isLocalOnly);
+        }
+        catch (InvalidRequestException e)
+        {
+            if (!ifExists)
+                throw e;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index e70aac9..876568a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -70,7 +70,8 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
     public ResultMessage execute(QueryState state, QueryOptions options) throws RequestValidationException
     {
         announceMigration(false);
-        return new ResultMessage.SchemaChange(changeEvent());
+        Event.SchemaChange ce = changeEvent();
+        return ce == null ? new ResultMessage.Void() : new ResultMessage.SchemaChange(ce);
     }
 
     public ResultMessage executeInternal(QueryState state, QueryOptions options)
@@ -78,7 +79,8 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
         try
         {
             announceMigration(true);
-            return new ResultMessage.SchemaChange(changeEvent());
+            Event.SchemaChange ce = changeEvent();
+            return ce == null ? new ResultMessage.Void() : new ResultMessage.SchemaChange(ce);
         }
         catch (RequestValidationException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/Selectable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selectable.java b/src/java/org/apache/cassandra/cql3/statements/Selectable.java
index 448301c..ab0a5a3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selectable.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selectable.java
@@ -44,11 +44,13 @@ public interface Selectable
 
     public static class WithFunction implements Selectable
     {
+        public final String namespace;
         public final String functionName;
         public final List<Selectable> args;
 
-        public WithFunction(String functionName, List<Selectable> args)
+        public WithFunction(String namespace, String functionName, List<Selectable> args)
         {
+            this.namespace = namespace;
             this.functionName = functionName;
             this.args = args;
         }
@@ -57,10 +59,13 @@ public interface Selectable
         public String toString()
         {
             StringBuilder sb = new StringBuilder();
+            if (!namespace.isEmpty())
+                sb.append(namespace).append("::");
             sb.append(functionName).append("(");
             for (int i = 0; i < args.size(); i++)
             {
-                if (i > 0) sb.append(", ");
+                if (i > 0)
+                    sb.append(", ");
                 sb.append(args.get(i));
             }
             return sb.append(")").toString();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index 0f0cb62..325ef15 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -29,6 +29,8 @@ import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.functions.Functions;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.udf.UDFunction;
+import org.apache.cassandra.cql3.udf.UDFRegistry;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.CounterCell;
 import org.apache.cassandra.db.ExpiringCell;
@@ -156,13 +158,26 @@ public abstract class Selection
         else
         {
             Selectable.WithFunction withFun = (Selectable.WithFunction)raw.selectable;
-            List<Selector> args = new ArrayList<Selector>(withFun.args.size());
+            List<Selector> args = new ArrayList<>(withFun.args.size());
             for (Selectable rawArg : withFun.args)
                 args.add(makeSelector(cfm, new RawSelector(rawArg, null), defs, null));
 
+            // resolve built-in functions before user defined functions
             AbstractType<?> returnType = Functions.getReturnType(withFun.functionName, cfm.ksName, cfm.cfName);
             if (returnType == null)
-                throw new InvalidRequestException(String.format("Unknown function '%s'", withFun.functionName));
+            {
+                UDFunction userFun = UDFRegistry.resolveFunction(withFun.namespace, withFun.functionName, cfm.ksName, cfm.cfName, args);
+                if (userFun != null)
+                {
+                    // got a user defined function to call
+                    Function fun = userFun.create(args);
+                    ColumnSpecification spec = makeFunctionSpec(cfm, withFun, fun.returnType(), raw.alias);
+                    if (metadata != null)
+                        metadata.add(spec);
+                    return new FunctionSelector(userFun.create(args), args);
+                }
+                throw new InvalidRequestException(String.format("Unknown function '%s'", withFun.namespace.isEmpty() ? withFun.functionName : withFun.namespace + "::" + withFun.functionName));
+            }
             ColumnSpecification spec = makeFunctionSpec(cfm, withFun, returnType, raw.alias);
             Function fun = Functions.get(cfm.ksName, withFun.functionName, args, spec);
             if (metadata != null)
@@ -193,7 +208,7 @@ public abstract class Selection
                                                         ColumnIdentifier alias) throws InvalidRequestException
     {
         if (returnType == null)
-            throw new InvalidRequestException(String.format("Unknown function %s called in selection clause", fun.functionName));
+            throw new InvalidRequestException(String.format("Unknown function %s called in selection clause", fun.namespace.isEmpty() ? fun.functionName : fun.namespace +"::"+fun.functionName));
 
         return new ColumnSpecification(cfm.ksName,
                                        cfm.cfName,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/udf/UDFFunctionOverloads.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/udf/UDFFunctionOverloads.java b/src/java/org/apache/cassandra/cql3/udf/UDFFunctionOverloads.java
new file mode 100644
index 0000000..aa6892a
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/udf/UDFFunctionOverloads.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.udf;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.cql3.AssignementTestable;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+public final class UDFFunctionOverloads
+{
+    final Map<String, UFMetaData> signatureMap = new ConcurrentHashMap<>();
+    final Map<String, UDFunction> udfInstances = new ConcurrentHashMap<>();
+
+    public void addAndInit(UFMetaData uf, boolean addIfInvalid)
+    {
+        try
+        {
+            UDFunction UDFunction = new UDFunction(uf);
+            udfInstances.put(uf.signature, UDFunction);
+        }
+        catch (InvalidRequestException e)
+        {
+            uf.invalid = e;
+        }
+
+        if (uf.invalid == null || addIfInvalid)
+            signatureMap.put(uf.signature, uf);
+    }
+
+    public void remove(UFMetaData uf)
+    {
+        signatureMap.remove(uf.signature);
+        udfInstances.remove(uf.signature);
+    }
+
+    public Collection<UFMetaData> values()
+    {
+        return signatureMap.values();
+    }
+
+    public boolean isEmpty()
+    {
+        return signatureMap.isEmpty();
+    }
+
+    public UDFunction resolveFunction(String ksName, String cfName, List<? extends AssignementTestable> args)
+    throws InvalidRequestException
+    {
+        for (UFMetaData candidate : signatureMap.values())
+        {
+            // Currently the UDF implementation must use concrete types (like Double, Integer) instead of base types (like Number).
+            // To support handling of base types it is necessary to construct new, temporary instances of UDFFunction with the
+            // signature for the current request in UDFFunction#argsType + UDFFunction#returnType.
+            // Additionally we need the requested return type (AssignementTestable) has a parameter for this method.
+            if (candidate.compatibleArgs(ksName, cfName, args))
+            {
+
+                // TODO CASSANDRA-7557 (specific per-function EXECUTE permission ??)
+
+                if (candidate.invalid != null)
+                    throw new InvalidRequestException(candidate.invalid.getMessage());
+                return udfInstances.get(candidate.signature);
+            }
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/udf/UDFRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/udf/UDFRegistry.java b/src/java/org/apache/cassandra/cql3/udf/UDFRegistry.java
new file mode 100644
index 0000000..cb3f1a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/udf/UDFRegistry.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.udf;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.cql3.AssignementTestable;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.functions.Functions;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * Central registry for user defined functions (CASSANDRA-7395).
+ * <p/>
+ * UDFs are maintained in {@code system.schema_functions} table and distributed to all nodes.
+ * <p/>
+ * UDFs are not maintained in {@link org.apache.cassandra.cql3.functions.Functions} class to have a strict
+ * distinction between 'core CQL' functions provided by Cassandra and functions provided by the user.
+ * 'Core CQL' functions have precedence over UDFs.
+ */
+public class UDFRegistry
+{
+    private static final Logger logger = LoggerFactory.getLogger(UDFRegistry.class);
+
+    static final String SELECT_CQL = "SELECT namespace, name, signature, deterministic, argument_names, argument_types, " +
+                                     "return_type, language, body FROM " +
+                                     Keyspace.SYSTEM_KS + '.' + SystemKeyspace.SCHEMA_FUNCTIONS_CF;
+
+    private static final Map<String, UDFFunctionOverloads> functions = new ConcurrentHashMap<>();
+
+    public static void init()
+    {
+        refreshInitial();
+    }
+
+    /**
+     * Initial loading of all existing UDFs.
+     */
+    public static void refreshInitial()
+    {
+        logger.debug("Refreshing UDFs");
+        for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(SELECT_CQL))
+        {
+            UFMetaData uf = UFMetaData.fromSchema(row);
+            UDFFunctionOverloads sigMap = functions.get(uf.qualifiedName);
+            if (sigMap == null)
+                functions.put(uf.qualifiedName, sigMap = new UDFFunctionOverloads());
+
+            if (Functions.contains(uf.qualifiedName))
+                logger.warn("The UDF '" + uf.functionName + "' cannot be used because it uses the same name as the CQL " +
+                            "function with the same name. You should drop this function but can do a " +
+                            "'DESCRIBE FUNCTION "+uf.functionName+";' in cqlsh before to get more information about it.");
+
+            // add the function to the registry even if it is invalid (to be able to drop it)
+            sigMap.addAndInit(uf, true);
+
+            if (uf.invalid != null)
+                logger.error("Loaded invalid UDF : " + uf.invalid.getMessage());
+        }
+    }
+
+    public static boolean hasFunction(String qualifiedName)
+    {
+        UDFFunctionOverloads sigMap = functions.get(qualifiedName.toLowerCase());
+        return sigMap != null && !sigMap.isEmpty();
+    }
+
+    public static UDFunction resolveFunction(String namespace, String functionName, String ksName, String cfName,
+                                             List<? extends AssignementTestable> args)
+    throws InvalidRequestException
+    {
+        UDFFunctionOverloads sigMap = functions.get(UFMetaData.qualifiedName(namespace, functionName));
+        if (sigMap != null)
+            return sigMap.resolveFunction(ksName, cfName, args);
+        return null;
+    }
+
+    public static void migrateDropFunction(UFMetaData uf)
+    {
+        UDFFunctionOverloads sigMap = functions.get(uf.qualifiedName);
+        if (sigMap == null)
+            return;
+
+        sigMap.remove(uf);
+    }
+
+    public static void migrateUpdateFunction(UFMetaData uf)
+    {
+        migrateAddFunction(uf);
+    }
+
+    public static void migrateAddFunction(UFMetaData uf)
+    {
+        addFunction(uf, true);
+    }
+
+    /**
+     * Used by {@link org.apache.cassandra.cql3.statements.CreateFunctionStatement} to create or replace a new function.
+     */
+    public static void tryCreateFunction(UFMetaData ufMeta) throws InvalidRequestException
+    {
+        addFunction(ufMeta, false);
+
+        if (ufMeta.invalid != null)
+            throw ufMeta.invalid;
+    }
+
+    private static void addFunction(UFMetaData uf, boolean addIfInvalid)
+    {
+        UDFFunctionOverloads sigMap = functions.get(uf.qualifiedName);
+        if (sigMap == null)
+            functions.put(uf.qualifiedName, sigMap = new UDFFunctionOverloads());
+
+        sigMap.addAndInit(uf, addIfInvalid);
+    }
+
+    public static UDFFunctionOverloads getFunctionSigMap(String qualifiedName)
+    {
+        return functions.get(qualifiedName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/udf/UDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/udf/UDFunction.java b/src/java/org/apache/cassandra/cql3/udf/UDFunction.java
new file mode 100644
index 0000000..4866c22
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/udf/UDFunction.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.udf;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.cql3.AssignementTestable;
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * UDFunction contains the <i>invokable</i> instance of a user defined function.
+ * Currently (as of CASSANDRA-7395) only {@code public static} methods in a {@link public} class
+ * can be invoked.
+ * CASSANDRA-7562 will introduce Java source code UDFs and CASSANDRA-7526 will introduce JSR-223 scripting languages.
+ * Invocations of UDFs are routed via this class.
+ */
+public class UDFunction
+{
+    private static final Logger logger = LoggerFactory.getLogger(UDFunction.class);
+
+    public final UFMetaData meta;
+
+    public final Method method;
+
+    UDFunction(UFMetaData meta) throws InvalidRequestException
+    {
+        this.meta = meta;
+
+        Method m;
+        switch (meta.language)
+        {
+            case "class":
+                m = resolveClassMethod();
+                break;
+            default:
+                throw new InvalidRequestException("Invalid UDF language " + meta.language + " for '" + meta.qualifiedName + '\'');
+        }
+        this.method = m;
+    }
+
+    private Method resolveClassMethod() throws InvalidRequestException
+    {
+        Class<?> jReturnType = meta.cqlReturnType.getType().getSerializer().getType();
+        Class<?> paramTypes[] = new Class[meta.cqlArgumentTypes.size()];
+        for (int i = 0; i < paramTypes.length; i++)
+            paramTypes[i] = meta.cqlArgumentTypes.get(i).getType().getSerializer().getType();
+
+        String className;
+        String methodName;
+        int i = meta.body.indexOf('#');
+        if (i != -1)
+        {
+            methodName = meta.body.substring(i + 1);
+            className = meta.body.substring(0, i);
+        }
+        else
+        {
+            methodName = meta.functionName;
+            className = meta.body;
+        }
+        try
+        {
+            Class<?> cls = Class.forName(className, false, Thread.currentThread().getContextClassLoader());
+
+            Method method = cls.getMethod(methodName, paramTypes);
+
+            if (!jReturnType.isAssignableFrom(method.getReturnType()))
+            {
+                throw new InvalidRequestException("Method " + className + '.' + methodName + '(' + Arrays.toString(paramTypes) + ") " +
+                                                  "has incompatible return type " + method.getReturnType() + " (not assignable to " + jReturnType + ')');
+            }
+
+            return method;
+        }
+        catch (ClassNotFoundException e)
+        {
+            throw new InvalidRequestException("Class " + className + " does not exist");
+        }
+        catch (NoSuchMethodException e)
+        {
+            throw new InvalidRequestException("Method " + className + '.' + methodName + '(' + Arrays.toString(paramTypes) + ") does not exist");
+        }
+    }
+
+    public Function create(List<? extends AssignementTestable> providedArgs) throws InvalidRequestException
+    {
+        final int argCount = providedArgs.size();
+        final List<AbstractType<?>> argsType = new ArrayList<>(argCount);
+        final AbstractType<?> returnType = meta.cqlReturnType.getType();
+        for (int i = 0; i < argCount; i++)
+        {
+            AbstractType<?> argType = meta.cqlArgumentTypes.get(i).getType();
+            argsType.add(argType);
+        }
+
+        return new Function()
+        {
+            public String name()
+            {
+                return meta.qualifiedName;
+            }
+
+            public List<AbstractType<?>> argsType()
+            {
+                return argsType;
+            }
+
+            public AbstractType<?> returnType()
+            {
+                return returnType;
+            }
+
+            public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+            {
+                Object[] parms = new Object[argCount];
+                for (int i = 0; i < parms.length; i++)
+                {
+                    ByteBuffer bb = parameters.get(i);
+                    if (bb != null)
+                    {
+                        AbstractType<?> argType = argsType.get(i);
+                        parms[i] = argType.compose(bb);
+                    }
+                }
+
+                Object result;
+                try
+                {
+                    result = method.invoke(null, parms);
+                    @SuppressWarnings("unchecked") ByteBuffer r = result != null ? ((AbstractType) returnType).decompose(result) : null;
+                    return r;
+                }
+                catch (InvocationTargetException e)
+                {
+                    Throwable c = e.getCause();
+                    logger.error("Invocation of UDF {} failed", meta.qualifiedName, c);
+                    throw new InvalidRequestException("Invocation of UDF " + meta.qualifiedName + " failed: " + c);
+                }
+                catch (IllegalAccessException e)
+                {
+                    throw new InvalidRequestException("UDF " + meta.qualifiedName + " invocation failed: " + e);
+                }
+            }
+
+            public boolean isPure()
+            {
+                return meta.deterministic;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index fc43c27..33a112a 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -24,6 +24,8 @@ import java.util.*;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.cql3.udf.UDFRegistry;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -177,6 +179,7 @@ public class DefsTables
         Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF, keyspaces);
         Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, keyspaces);
         Map<DecoratedKey, ColumnFamily> oldTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF, keyspaces);
+        Map<DecoratedKey, ColumnFamily> oldFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_CF);
 
         for (Mutation mutation : mutations)
             mutation.apply();
@@ -188,10 +191,12 @@ public class DefsTables
         Map<DecoratedKey, ColumnFamily> newKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF, keyspaces);
         Map<DecoratedKey, ColumnFamily> newColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, keyspaces);
         Map<DecoratedKey, ColumnFamily> newTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF, keyspaces);
+        Map<DecoratedKey, ColumnFamily> newFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_CF);
 
         Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
         mergeColumnFamilies(oldColumnFamilies, newColumnFamilies);
         mergeTypes(oldTypes, newTypes);
+        mergeFunctions(oldFunctions, newFunctions);
 
         // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
         for (String keyspaceToDrop : keyspacesToDrop)
@@ -377,6 +382,54 @@ public class DefsTables
         }
     }
 
+    private static void mergeFunctions(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated)
+    {
+        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated);
+
+        // New namespace with functions
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+        {
+            ColumnFamily cfFunctions = entry.getValue();
+            if (!cfFunctions.hasColumns())
+                continue;
+
+            for (UFMetaData uf : UFMetaData.fromSchema(new Row(entry.getKey(), cfFunctions)).values())
+                addFunction(uf);
+        }
+
+        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntry : diff.entriesDiffering().entrySet())
+        {
+            DecoratedKey namespace = modifiedEntry.getKey();
+            ColumnFamily prevCFFunctions = modifiedEntry.getValue().leftValue(); // state before external modification
+            ColumnFamily newCFFunctions = modifiedEntry.getValue().rightValue(); // updated state
+
+            if (!prevCFFunctions.hasColumns()) // whole namespace was deleted and now it's re-created
+            {
+                for (UFMetaData uf : UFMetaData.fromSchema(new Row(namespace, newCFFunctions)).values())
+                    addFunction(uf);
+            }
+            else if (!newCFFunctions.hasColumns()) // whole namespace is deleted
+            {
+                for (UFMetaData uf : UFMetaData.fromSchema(new Row(namespace, prevCFFunctions)).values())
+                    dropFunction(uf);
+            }
+            else // has modifications in the functions, need to perform nested diff to determine what was really changed
+            {
+                MapDifference<String, UFMetaData> functionsDiff = Maps.difference(UFMetaData.fromSchema(new Row(namespace, prevCFFunctions)),
+                    UFMetaData.fromSchema(new Row(namespace, newCFFunctions)));
+
+                for (UFMetaData function : functionsDiff.entriesOnlyOnRight().values())
+                    addFunction(function);
+
+                for (UFMetaData function : functionsDiff.entriesOnlyOnLeft().values())
+                    dropFunction(function);
+
+                for (MapDifference.ValueDifference<UFMetaData> tdiff : functionsDiff.entriesDiffering().values())
+                    updateFunction(tdiff.rightValue()); // use the most recent value
+            }
+        }
+    }
+
     private static void addKeyspace(KSMetaData ksm)
     {
         assert Schema.instance.getKSMetaData(ksm.name) == null;
@@ -425,6 +478,16 @@ public class DefsTables
             MigrationManager.instance.notifyCreateUserType(ut);
     }
 
+    private static void addFunction(UFMetaData uf)
+    {
+        logger.info("Loading {}", uf);
+
+        UDFRegistry.migrateAddFunction(uf);
+
+        if (!StorageService.instance.isClientMode())
+            MigrationManager.instance.notifyCreateFunction(uf);
+    }
+
     private static void updateKeyspace(KSMetaData newState)
     {
         KSMetaData oldKsm = Schema.instance.getKSMetaData(newState.name);
@@ -467,6 +530,16 @@ public class DefsTables
             MigrationManager.instance.notifyUpdateUserType(ut);
     }
 
+    private static void updateFunction(UFMetaData uf)
+    {
+        logger.info("Updating {}", uf);
+
+        UDFRegistry.migrateUpdateFunction(uf);
+
+        if (!StorageService.instance.isClientMode())
+            MigrationManager.instance.notifyUpdateFunction(uf);
+    }
+
     private static void dropKeyspace(String ksName)
     {
         KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
@@ -546,6 +619,16 @@ public class DefsTables
             MigrationManager.instance.notifyDropUserType(ut);
     }
 
+    private static void dropFunction(UFMetaData uf)
+    {
+        logger.info("Drop {}", uf);
+
+        UDFRegistry.migrateDropFunction(uf);
+
+        if (!StorageService.instance.isClientMode())
+            MigrationManager.instance.notifyDropFunction(uf);
+    }
+
     private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)
     {
         // clone ksm but do not include the new def

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 3c647b6..8b62740 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -80,6 +80,7 @@ public class SystemKeyspace
     public static final String SCHEMA_COLUMNS_CF = "schema_columns";
     public static final String SCHEMA_TRIGGERS_CF = "schema_triggers";
     public static final String SCHEMA_USER_TYPES_CF = "schema_usertypes";
+    public static final String SCHEMA_FUNCTIONS_CF = "schema_functions";
     public static final String COMPACTION_LOG = "compactions_in_progress";
     public static final String PAXOS_CF = "paxos";
     public static final String SSTABLE_ACTIVITY_CF = "sstable_activity";
@@ -91,7 +92,8 @@ public class SystemKeyspace
                                                                   SCHEMA_COLUMNFAMILIES_CF,
                                                                   SCHEMA_COLUMNS_CF,
                                                                   SCHEMA_TRIGGERS_CF,
-                                                                  SCHEMA_USER_TYPES_CF);
+                                                                  SCHEMA_USER_TYPES_CF,
+                                                                  SCHEMA_FUNCTIONS_CF);
 
     private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
 
@@ -769,6 +771,16 @@ public class SystemKeyspace
         }
     }
 
+    public static Map<DecoratedKey, ColumnFamily> getSchema(String cfName)
+    {
+        Map<DecoratedKey, ColumnFamily> schema = new HashMap<>();
+
+        for (Row schemaEntity : SystemKeyspace.serializedSchema(cfName))
+            schema.put(schemaEntity.key, schemaEntity.cf);
+
+        return schema;
+    }
+
     public static Map<DecoratedKey, ColumnFamily> getSchema(String schemaCfName, Set<String> keyspaces)
     {
         Map<DecoratedKey, ColumnFamily> schema = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 5c88cb1..71cba23 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -33,6 +33,7 @@ import javax.management.StandardMBean;
 
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.cql3.udf.UDFRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -372,6 +373,9 @@ public class CassandraDaemon
         if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
             waitForGossipToSettle();
 
+        // UDF
+        UDFRegistry.init();
+
         // Thift
         InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
         int rpcPort = DatabaseDescriptor.getRpcPort();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/service/IMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java
index 4d142bd..b4eb392 100644
--- a/src/java/org/apache/cassandra/service/IMigrationListener.java
+++ b/src/java/org/apache/cassandra/service/IMigrationListener.java
@@ -22,12 +22,16 @@ public interface IMigrationListener
     public void onCreateKeyspace(String ksName);
     public void onCreateColumnFamily(String ksName, String cfName);
     public void onCreateUserType(String ksName, String typeName);
+    public void onCreateFunction(String namespace, String functionName);
 
     public void onUpdateKeyspace(String ksName);
     public void onUpdateColumnFamily(String ksName, String cfName);
     public void onUpdateUserType(String ksName, String typeName);
+    public void onUpdateFunction(String namespace, String functionName);
 
     public void onDropKeyspace(String ksName);
     public void onDropColumnFamily(String ksName, String cfName);
     public void onDropUserType(String ksName, String typeName);
+    public void onDropFunction(String namespace, String functionName);
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 5dd2534..28e3e39 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -29,6 +29,9 @@ import java.util.concurrent.*;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.SyntaxException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -173,6 +176,24 @@ public class MigrationManager
             listener.onCreateUserType(ut.keyspace, ut.getNameAsString());
     }
 
+    public void notifyCreateFunction(UFMetaData uf)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onCreateFunction(uf.namespace, uf.functionName);
+    }
+
+    public void notifyUpdateFunction(UFMetaData uf)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onUpdateFunction(uf.namespace, uf.functionName);
+    }
+
+    public void notifyDropFunction(UFMetaData uf)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onDropFunction(uf.namespace, uf.functionName);
+    }
+
     public void notifyUpdateKeyspace(KSMetaData ksm)
     {
         for (IMigrationListener listener : listeners)
@@ -352,6 +373,27 @@ public class MigrationManager
         announce(addSerializedKeyspace(UTMetaData.dropFromSchema(droppedType, FBUtilities.timestampMicros()), droppedType.keyspace), announceLocally);
     }
 
+    public static void announceFunctionDrop(String namespace, String functionName, boolean announceLocally) throws InvalidRequestException
+    {
+        Mutation mutation = UFMetaData.dropFunction(FBUtilities.timestampMicros(), namespace, functionName);
+        if (mutation == null)
+            throw new InvalidRequestException(String.format("Cannot drop non existing function '%s'.", functionName));
+
+        logger.info(String.format("Drop Function '%s::%s'", namespace, functionName));
+        announce(mutation, announceLocally);
+    }
+
+    public static void announceNewFunction(UFMetaData function, boolean announceLocally)
+        throws ConfigurationException, SyntaxException
+    {
+        Mutation mutation = UFMetaData.createOrReplaceFunction(FBUtilities.timestampMicros(), function);
+        if (mutation == null)
+            throw new ConfigurationException(String.format("Function '%s' already exists.", function.qualifiedName));
+
+        logger.info(String.format("Create Function '%s'", function));
+        announce(mutation, announceLocally);
+    }
+
     /**
      * actively announce a new version to active hosts via rpc
      * @param schema The schema mutation to be applied

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index b7c5e68..85943cf 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -208,18 +208,18 @@ public abstract class Event
 
         public final Change change;
         public final Target target;
-        public final String keyspace;
-        public final String tableOrType;
+        public final String keyOrNamespace;
+        public final String tableOrTypeOrFunction;
 
-        public SchemaChange(Change change, Target target, String keyspace, String tableOrType)
+        public SchemaChange(Change change, Target target, String keyOrNamespace, String tableOrTypeOrFunction)
         {
             super(Type.SCHEMA_CHANGE);
             this.change = change;
             this.target = target;
-            this.keyspace = keyspace;
-            this.tableOrType = tableOrType;
+            this.keyOrNamespace = keyOrNamespace;
+            this.tableOrTypeOrFunction = tableOrTypeOrFunction;
             if (target != Target.KEYSPACE)
-                assert this.tableOrType != null : "Table or type should be set for non-keyspace schema change events";
+                assert this.tableOrTypeOrFunction != null : "Table or type should be set for non-keyspace schema change events";
         }
 
         public SchemaChange(Change change, String keyspace)
@@ -252,9 +252,9 @@ public abstract class Event
             {
                 CBUtil.writeEnumValue(change, dest);
                 CBUtil.writeEnumValue(target, dest);
-                CBUtil.writeString(keyspace, dest);
+                CBUtil.writeString(keyOrNamespace, dest);
                 if (target != Target.KEYSPACE)
-                    CBUtil.writeString(tableOrType, dest);
+                    CBUtil.writeString(tableOrTypeOrFunction, dest);
             }
             else
             {
@@ -263,14 +263,14 @@ public abstract class Event
                     // For the v1/v2 protocol, we have no way to represent type changes, so we simply say the keyspace
                     // was updated.  See CASSANDRA-7617.
                     CBUtil.writeEnumValue(Change.UPDATED, dest);
-                    CBUtil.writeString(keyspace, dest);
+                    CBUtil.writeString(keyOrNamespace, dest);
                     CBUtil.writeString("", dest);
                 }
                 else
                 {
                     CBUtil.writeEnumValue(change, dest);
-                    CBUtil.writeString(keyspace, dest);
-                    CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrType, dest);
+                    CBUtil.writeString(keyOrNamespace, dest);
+                    CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrTypeOrFunction, dest);
                 }
             }
         }
@@ -281,10 +281,10 @@ public abstract class Event
             {
                 int size = CBUtil.sizeOfEnumValue(change)
                          + CBUtil.sizeOfEnumValue(target)
-                         + CBUtil.sizeOfString(keyspace);
+                         + CBUtil.sizeOfString(keyOrNamespace);
 
                 if (target != Target.KEYSPACE)
-                    size += CBUtil.sizeOfString(tableOrType);
+                    size += CBUtil.sizeOfString(tableOrTypeOrFunction);
 
                 return size;
             }
@@ -293,25 +293,25 @@ public abstract class Event
                 if (target == Target.TYPE)
                 {
                     return CBUtil.sizeOfEnumValue(Change.UPDATED)
-                         + CBUtil.sizeOfString(keyspace)
+                         + CBUtil.sizeOfString(keyOrNamespace)
                          + CBUtil.sizeOfString("");
                 }
                 return CBUtil.sizeOfEnumValue(change)
-                     + CBUtil.sizeOfString(keyspace)
-                     + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrType);
+                     + CBUtil.sizeOfString(keyOrNamespace)
+                     + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrTypeOrFunction);
             }
         }
 
         @Override
         public String toString()
         {
-            return change + " " + target + " " + keyspace + (tableOrType == null ? "" : "." + tableOrType);
+            return change + " " + target + " " + keyOrNamespace + (tableOrTypeOrFunction == null ? "" : "." + tableOrTypeOrFunction);
         }
 
         @Override
         public int hashCode()
         {
-            return Objects.hashCode(change, target, keyspace, tableOrType);
+            return Objects.hashCode(change, target, keyOrNamespace, tableOrTypeOrFunction);
         }
 
         @Override
@@ -323,8 +323,8 @@ public abstract class Event
             SchemaChange scc = (SchemaChange)other;
             return Objects.equal(change, scc.change)
                 && Objects.equal(target, scc.target)
-                && Objects.equal(keyspace, scc.keyspace)
-                && Objects.equal(tableOrType, scc.tableOrType);
+                && Objects.equal(keyOrNamespace, scc.keyOrNamespace)
+                && Objects.equal(tableOrTypeOrFunction, scc.tableOrTypeOrFunction);
         }
     }
 }


Mime
View raw message