drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [08/11] drill git commit: DRILL-4726: Dynamic UDF Support
Date Tue, 18 Oct 2016 23:45:28 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/JarScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/JarScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/JarScan.java
new file mode 100644
index 0000000..4ebb3e2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/JarScan.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.registry;
+
+import org.apache.drill.common.scanner.persistence.ScanResult;
+
+/**
+ * Holder class that contains:
+ * <ol>
+ *   <li>jar name</li>
+ *   <li>scan of packages, classes, annotations found in jar</li>
+ *   <li>unique jar classLoader</li>
+ * </ol>
+ */
+public class JarScan {
+
+  private final String jarName;
+  private final ScanResult scanResult;
+  private final ClassLoader classLoader;
+
+  public JarScan(String jarName, ScanResult scanResult, ClassLoader classLoader) {
+    this.jarName = jarName;
+    this.scanResult = scanResult;
+    this.classLoader = classLoader;
+  }
+
+  public String getJarName() {
+    return jarName;
+  }
+
+  public ClassLoader getClassLoader() {
+    return classLoader;
+  }
+
+  public ScanResult getScanResult() {
+    return scanResult;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
new file mode 100644
index 0000000..03fd608
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.registry;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.scanner.persistence.AnnotatedClassDescriptor;
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.exception.FunctionValidationException;
+import org.apache.drill.exec.exception.JarValidationException;
+import org.apache.drill.exec.expr.fn.DrillFuncHolder;
+import org.apache.drill.exec.expr.fn.FunctionConverter;
+import org.apache.drill.exec.planner.logical.DrillConstExecutor;
+import org.apache.drill.exec.planner.sql.DrillOperatorTable;
+import org.apache.drill.exec.planner.sql.DrillSqlAggOperator;
+import org.apache.drill.exec.planner.sql.DrillSqlAggOperatorWithoutInference;
+import org.apache.drill.exec.planner.sql.DrillSqlOperator;
+
+import com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.exec.planner.sql.DrillSqlOperatorWithoutInference;
+
+/**
+ * Registry of Drill functions.
+ */
+public class LocalFunctionRegistry {
+
+  public static final String BUILT_IN = "built-in";
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalFunctionRegistry.class);
+  private static final String functionSignaturePattern = "%s(%s)";
+
+  private static final ImmutableMap<String, Pair<Integer, Integer>> registeredFuncNameToArgRange = ImmutableMap.<String, Pair<Integer, Integer>> builder()
+      // CONCAT is allowed to take [1, infinity) number of arguments.
+      // Currently, this flexibility is offered by DrillOptiq to rewrite it as
+      // a nested structure
+      .put("CONCAT", Pair.of(1, Integer.MAX_VALUE))
+
+      // When LENGTH is given two arguments, this function relies on DrillOptiq to rewrite it as
+      // another function based on the second argument (encodingType)
+      .put("LENGTH", Pair.of(1, 2))
+
+      // Dummy functions
+      .put("CONVERT_TO", Pair.of(2, 2))
+      .put("CONVERT_FROM", Pair.of(2, 2))
+      .put("FLATTEN", Pair.of(1, 1)).build();
+
+  private final FunctionRegistryHolder registryHolder;
+
+  /** Registers all functions present in Drill classpath on start-up. All functions will be marked as built-in.
+   * Built-in functions are not allowed to be unregistered. */
+  public LocalFunctionRegistry(ScanResult classpathScan) {
+    registryHolder = new FunctionRegistryHolder();
+    validate(BUILT_IN, classpathScan);
+    register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, this.getClass().getClassLoader())));
+    if (logger.isTraceEnabled()) {
+      StringBuilder allFunctions = new StringBuilder();
+      for (DrillFuncHolder method: registryHolder.getAllFunctionsWithHolders().values()) {
+        allFunctions.append(method.toString()).append("\n");
+      }
+      logger.trace("Registered functions: [\n{}]", allFunctions);
+    }
+  }
+
+  /**
+   * @return local function registry version number
+   */
+  public long getVersion() {
+    return registryHolder.getVersion();
+  }
+
+  /**
+   * Validates all functions, present in jars.
+   * Will throw {@link FunctionValidationException} if:
+   * <ol>
+   *  <li>Jar with the same name has been already registered.</li>
+   *  <li>Conflicting function with the similar signature is found.</li>
+   *  <li>Aggregating function is not deterministic.</li>
+   *</ol>
+   * @param jarName jar name to be validated
+   * @param scanResult scan of all classes present in jar
+   * @return list of validated function signatures
+   */
+  public List<String> validate(String jarName, ScanResult scanResult) {
+    List<String> functions = Lists.newArrayList();
+    FunctionConverter converter = new FunctionConverter();
+    List<AnnotatedClassDescriptor> providerClasses = scanResult.getAnnotatedClasses();
+
+    if (registryHolder.containsJar(jarName)) {
+      throw new JarValidationException(String.format("Jar with %s name has been already registered", jarName));
+    }
+
+    final ListMultimap<String, String> allFuncWithSignatures = registryHolder.getAllFunctionsWithSignatures();
+
+    for (AnnotatedClassDescriptor func : providerClasses) {
+      DrillFuncHolder holder = converter.getHolder(func, ClassLoader.getSystemClassLoader());
+      if (holder != null) {
+        String functionInput = holder.getInputParameters();
+
+        String[] names = holder.getRegisteredNames();
+        for (String name : names) {
+          String functionName = name.toLowerCase();
+          String functionSignature = String.format(functionSignaturePattern, functionName, functionInput);
+
+          if (allFuncWithSignatures.get(functionName).contains(functionSignature)) {
+            throw new FunctionValidationException(String.format("Found duplicated function in %s: %s",
+                registryHolder.getJarNameByFunctionSignature(functionName, functionSignature), functionSignature));
+          } else if (holder.isAggregating() && !holder.isDeterministic()) {
+            throw new FunctionValidationException(
+                String.format("Aggregate functions must be deterministic: %s", func.getClassName()));
+          } else {
+            functions.add(functionSignature);
+            allFuncWithSignatures.put(functionName, functionSignature);
+          }
+        }
+      } else {
+        logger.warn("Unable to initialize function for class {}", func.getClassName());
+      }
+    }
+    return functions;
+  }
+
+  /**
+   * Registers all functions present in jar.
+   * If jar name is already registered, all jar related functions will be overridden.
+   * To prevent classpath collisions during loading and unloading jars,
+   * each jar is shipped with its own class loader.
+   *
+   * @param jars list of jars to be registered
+   */
+  public void register(List<JarScan> jars) {
+    Map<String, List<FunctionHolder>> newJars = Maps.newHashMap();
+    for (JarScan jarScan : jars) {
+      FunctionConverter converter = new FunctionConverter();
+      List<AnnotatedClassDescriptor> providerClasses = jarScan.getScanResult().getAnnotatedClasses();
+      List<FunctionHolder> functions = Lists.newArrayList();
+      newJars.put(jarScan.getJarName(), functions);
+      for (AnnotatedClassDescriptor func : providerClasses) {
+        DrillFuncHolder holder = converter.getHolder(func, jarScan.getClassLoader());
+        if (holder != null) {
+          String functionInput = holder.getInputParameters();
+          String[] names = holder.getRegisteredNames();
+          for (String name : names) {
+            String functionName = name.toLowerCase();
+            String functionSignature = String.format(functionSignaturePattern, functionName, functionInput);
+            functions.add(new FunctionHolder(functionName, functionSignature, holder));
+          }
+        }
+      }
+    }
+    registryHolder.addJars(newJars);
+  }
+
+  /**
+   * Removes all function associated with the given jar name.
+   * Functions marked as built-in is not allowed to be unregistered.
+   * If user attempts to unregister built-in functions, logs warning and does nothing.
+   * Jar name is case-sensitive.
+   *
+   * @param jarName jar name to be unregistered
+   */
+  public void unregister(String jarName) {
+    if (BUILT_IN.equals(jarName)) {
+      logger.warn("Functions marked as built-in are not allowed to be unregistered.");
+      return;
+    }
+    registryHolder.removeJar(jarName);
+  }
+
+  /**
+   * Returns list of jar names registered in function registry.
+   *
+   * @return list of jar names
+   */
+  public List<String> getAllJarNames() {
+    return registryHolder.getAllJarNames();
+  }
+
+  /**
+   * @return quantity of all registered functions
+   */
+  public int size(){
+    return registryHolder.functionsSize();
+  }
+
+  /**
+   * @param name function name
+   * @return all function holders associated with the function name. Function name is case insensitive.
+   */
+  public List<DrillFuncHolder> getMethods(String name, AtomicLong version) {
+    return registryHolder.getHoldersByFunctionName(name.toLowerCase(), version);
+  }
+
+  public List<DrillFuncHolder> getMethods(String name) {
+    return registryHolder.getHoldersByFunctionName(name.toLowerCase());
+  }
+
+  /**
+   * Registers all functions present in {@link DrillOperatorTable},
+   * also sets local registry version used at the moment of registering.
+   *
+   * @param operatorTable drill operator table
+   */
+  public void register(DrillOperatorTable operatorTable) {
+    AtomicLong versionHolder = new AtomicLong();
+    final Map<String, Collection<DrillFuncHolder>> registeredFunctions = registryHolder.getAllFunctionsWithHolders(versionHolder).asMap();
+    operatorTable.setFunctionRegistryVersion(versionHolder.get());
+    registerOperatorsWithInference(operatorTable, registeredFunctions);
+    registerOperatorsWithoutInference(operatorTable, registeredFunctions);
+  }
+
+  private void registerOperatorsWithInference(DrillOperatorTable operatorTable, Map<String, Collection<DrillFuncHolder>> registeredFunctions) {
+    final Map<String, DrillSqlOperator.DrillSqlOperatorBuilder> map = Maps.newHashMap();
+    final Map<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> mapAgg = Maps.newHashMap();
+    for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.entrySet()) {
+      final ArrayListMultimap<Pair<Integer, Integer>, DrillFuncHolder> functions = ArrayListMultimap.create();
+      final ArrayListMultimap<Integer, DrillFuncHolder> aggregateFunctions = ArrayListMultimap.create();
+      final String name = function.getKey().toUpperCase();
+      boolean isDeterministic = true;
+      for (DrillFuncHolder func : function.getValue()) {
+        final int paramCount = func.getParamCount();
+        if(func.isAggregating()) {
+          aggregateFunctions.put(paramCount, func);
+        } else {
+          final Pair<Integer, Integer> argNumberRange;
+          if(registeredFuncNameToArgRange.containsKey(name)) {
+            argNumberRange = registeredFuncNameToArgRange.get(name);
+          } else {
+            argNumberRange = Pair.of(func.getParamCount(), func.getParamCount());
+          }
+          functions.put(argNumberRange, func);
+        }
+
+        if(!func.isDeterministic()) {
+          isDeterministic = false;
+        }
+      }
+      for (Entry<Pair<Integer, Integer>, Collection<DrillFuncHolder>> entry : functions.asMap().entrySet()) {
+        final Pair<Integer, Integer> range = entry.getKey();
+        final int max = range.getRight();
+        final int min = range.getLeft();
+        if(!map.containsKey(name)) {
+          map.put(name, new DrillSqlOperator.DrillSqlOperatorBuilder()
+              .setName(name));
+        }
+
+        final DrillSqlOperator.DrillSqlOperatorBuilder drillSqlOperatorBuilder = map.get(name);
+        drillSqlOperatorBuilder
+            .addFunctions(entry.getValue())
+            .setArgumentCount(min, max)
+            .setDeterministic(isDeterministic);
+      }
+      for (Entry<Integer, Collection<DrillFuncHolder>> entry : aggregateFunctions.asMap().entrySet()) {
+        if(!mapAgg.containsKey(name)) {
+          mapAgg.put(name, new DrillSqlAggOperator.DrillSqlAggOperatorBuilder().setName(name));
+        }
+
+        final DrillSqlAggOperator.DrillSqlAggOperatorBuilder drillSqlAggOperatorBuilder = mapAgg.get(name);
+        drillSqlAggOperatorBuilder
+            .addFunctions(entry.getValue())
+            .setArgumentCount(entry.getKey(), entry.getKey());
+      }
+    }
+
+    for(final Entry<String, DrillSqlOperator.DrillSqlOperatorBuilder> entry : map.entrySet()) {
+      operatorTable.addOperatorWithInference(
+          entry.getKey(),
+          entry.getValue().build());
+    }
+
+    for(final Entry<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> entry : mapAgg.entrySet()) {
+      operatorTable.addOperatorWithInference(
+          entry.getKey(),
+          entry.getValue().build());
+    }
+  }
+
+  private void registerOperatorsWithoutInference(DrillOperatorTable operatorTable, Map<String, Collection<DrillFuncHolder>> registeredFunctions) {
+    SqlOperator op;
+    for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.entrySet()) {
+      Set<Integer> argCounts = Sets.newHashSet();
+      String name = function.getKey().toUpperCase();
+      for (DrillFuncHolder func : function.getValue()) {
+        if (argCounts.add(func.getParamCount())) {
+          if (func.isAggregating()) {
+            op = new DrillSqlAggOperatorWithoutInference(name, func.getParamCount());
+          } else {
+            boolean isDeterministic;
+            // prevent Drill from folding constant functions with types that cannot be materialized
+            // into literals
+            if (DrillConstExecutor.NON_REDUCIBLE_TYPES.contains(func.getReturnType().getMinorType())) {
+              isDeterministic = false;
+            } else {
+              isDeterministic = func.isDeterministic();
+            }
+            op = new DrillSqlOperatorWithoutInference(name, func.getParamCount(), func.getReturnType(), isDeterministic);
+          }
+          operatorTable.addOperatorWithoutInference(function.getKey(), op);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
new file mode 100644
index 0000000..4ce4a19
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.registry;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.store.TransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.coord.store.TransientStoreListener;
+import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.exception.VersionMismatchException;
+import org.apache.drill.exec.proto.SchemaUserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.Registry;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.File;
+import java.io.IOException;
+
+import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT;
+
+/**
+ * Is responsible for remote function registry management.
+ * Creates all remote registry areas at startup and validates them,
+ * during init establishes connections with three udf related stores.
+ * Provides tools to work with three udf related stores, gives access to remote registry areas.
+ *
+ * There are three udf stores:
+ * REGISTRY - persistent store, stores remote function registry {@link Registry} under udf path
+ * which contains information about all dynamically registered jars and their function signatures.
+ * If connection is created for the first time, puts empty remote registry.
+ *
+ * UNREGISTRATION - transient store, stores information under udf/unregister path.
+ * udf/unregister path is persistent by itself but any child created will be transient.
+ * Whenever user submits request to unregister jar, child path with jar name is created under this store.
+ * This store also holds unregistration listener, which notifies all drill bits when child path is created,
+ * so they can start local unregistration process.
+ *
+ * JARS - transient store, stores information under udf/jars path.
+ * udf/jars path is persistent by itself but any child created will be transient.
+ * Servers as lock, not allowing to perform any action on the same time.
+ * There two types of actions: {@link Action#REGISTRATION} and {@link Action#UNREGISTRATION}.
+ * Before starting any action, users tries to create child path with jar name under this store
+ * and if such path already exists, receives action being performed on that very jar.
+ * When user finishes its action, he deletes child path with jar name.
+ *
+ * There are three udf areas:
+ * STAGING - area where user copies binary and source jars before starting registration process.
+ * REGISTRY - area where registered jars are stored.
+ * TMP - area where source and binary jars are backed up in unique folder during registration process.
+ */
+public class RemoteFunctionRegistry implements AutoCloseable {
+
+  public static final String REGISTRY = "registry";
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteFunctionRegistry.class);
+  private static final ObjectMapper mapper = new ObjectMapper().enable(INDENT_OUTPUT);
+
+  private final TransientStoreListener unregistrationListener;
+  private int retryAttempts;
+  private FileSystem fs;
+  private Path registryArea;
+  private Path stagingArea;
+  private Path tmpArea;
+
+  private PersistentStore<Registry> registry;
+  private TransientStore<String> unregistration;
+  private TransientStore<String> jars;
+
+  public RemoteFunctionRegistry(TransientStoreListener unregistrationListener) {
+    this.unregistrationListener = unregistrationListener;
+  }
+
+  public void init(DrillConfig config, PersistentStoreProvider storeProvider, ClusterCoordinator coordinator) {
+    prepareStores(storeProvider, coordinator);
+    prepareAreas(config);
+    this.retryAttempts = config.getInt(ExecConstants.UDF_RETRY_ATTEMPTS);
+  }
+
+  public Registry getRegistry() {
+    return registry.get(REGISTRY, null);
+  }
+
+  public Registry getRegistry(DataChangeVersion version) {
+    return registry.get(REGISTRY, version);
+  }
+
+  public void updateRegistry(Registry registryContent, DataChangeVersion version) throws VersionMismatchException {
+    registry.put(REGISTRY, registryContent, version);
+  }
+
+  public void submitForUnregistration(String jar) {
+    unregistration.putIfAbsent(jar, jar);
+  }
+
+  public void finishUnregistration(String jar) {
+    unregistration.remove(jar);
+  }
+
+  public String addToJars(String jar, Action action) {
+    return jars.putIfAbsent(jar, action.toString());
+  }
+
+  public void removeFromJars(String jar) {
+    jars.remove(jar);
+  }
+
+  public int getRetryAttempts() {
+    return retryAttempts;
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  public Path getRegistryArea() {
+    return registryArea;
+  }
+
+  public Path getStagingArea() {
+    return stagingArea;
+  }
+
+  public Path getTmpArea() {
+    return tmpArea;
+  }
+
+  /**
+   * Connects to three stores: REGISTRY, UNREGISTRATION, JARS.
+   * Puts in REGISTRY store with default instance of remote function registry if store is initiated for the first time.
+   * Registers unregistration listener in UNREGISTRATION store.
+   */
+  private void prepareStores(PersistentStoreProvider storeProvider, ClusterCoordinator coordinator) {
+    try {
+      PersistentStoreConfig<Registry> registrationConfig = PersistentStoreConfig
+          .newProtoBuilder(SchemaUserBitShared.Registry.WRITE, SchemaUserBitShared.Registry.MERGE)
+          .name("udf")
+          .persist()
+          .build();
+      registry = storeProvider.getOrCreateStore(registrationConfig);
+      registry.putIfAbsent(REGISTRY, Registry.getDefaultInstance());
+    } catch (StoreException e) {
+      throw new DrillRuntimeException("Failure while loading remote registry.", e);
+    }
+
+    TransientStoreConfig<String> unregistrationConfig = TransientStoreConfig.
+        newJacksonBuilder(mapper, String.class).name("udf/unregister").build();
+    unregistration = coordinator.getOrCreateTransientStore(unregistrationConfig);
+    unregistration.addListener(unregistrationListener);
+
+    TransientStoreConfig<String> jarsConfig = TransientStoreConfig.
+        newJacksonBuilder(mapper, String.class).name("udf/jars").build();
+    jars = coordinator.getOrCreateTransientStore(jarsConfig);
+  }
+
+  /**
+   * Creates if absent and validates three udf areas: STAGING, REGISTRY and TMP.
+   * Generated udf ares root from {@link ExecConstants#UDF_DIRECTORY_ROOT},
+   * if not set, uses user home directory instead.
+   */
+  private void prepareAreas(DrillConfig config) {
+    Configuration conf = new Configuration();
+    if (config.hasPath(ExecConstants.UDF_DIRECTORY_FS)) {
+      conf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.getString(ExecConstants.UDF_DIRECTORY_FS));
+    }
+
+    try {
+      this.fs = FileSystem.get(conf);
+    } catch (IOException e) {
+      DrillRuntimeException.format(e, "Error during file system %s setup", conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+    }
+
+    String root = fs.getHomeDirectory().toUri().getPath();
+    if (config.hasPath(ExecConstants.UDF_DIRECTORY_ROOT)) {
+      root = config.getString(ExecConstants.UDF_DIRECTORY_ROOT);
+    }
+
+    this.registryArea = createArea(fs, root, config.getString(ExecConstants.UDF_DIRECTORY_REGISTRY));
+    this.stagingArea = createArea(fs, root, config.getString(ExecConstants.UDF_DIRECTORY_STAGING));
+    this.tmpArea = createArea(fs, root, config.getString(ExecConstants.UDF_DIRECTORY_TMP));
+  }
+
+  /**
+   * Concatenates udf are with root directory.
+   * Creates udf area, if area does not exist.
+   * Checks if area exists and is directory, if it is writable for current user,
+   * throws {@link DrillRuntimeException} otherwise.
+   *
+   * @param fs file system where area should be created or checked
+   * @param root root directory
+   * @param directory directory path
+   * @return path to area
+   */
+  private Path createArea(FileSystem fs, String root, String directory) {
+    Path path = new Path(new File(root, directory).toURI().getPath());
+    String fullPath = path.toUri().getPath();
+    try {
+      fs.mkdirs(path);
+      Preconditions.checkState(fs.exists(path), "Area [%s] must exist", fullPath);
+      FileStatus fileStatus = fs.getFileStatus(path);
+      Preconditions.checkState(fileStatus.isDirectory(), "Area [%s] must be a directory", fullPath);
+      UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+      FsPermission permission = fileStatus.getPermission();
+      // It is considered that current user has write rights on directory if:
+      // 1. current user is owner of the directory and has write rights
+      // 2. current user is in group that has write rights
+      // 3. any user has write rights
+      Preconditions.checkState(
+          (currentUser.getUserName().equals(fileStatus.getOwner())
+              && permission.getUserAction().implies(FsAction.WRITE)) ||
+          (Lists.newArrayList(currentUser.getGroupNames()).contains(fileStatus.getGroup())
+              && permission.getGroupAction().implies(FsAction.WRITE)) ||
+          permission.getOtherAction().implies(FsAction.WRITE),
+          "Area [%s] must be writable and executable for application user", fullPath);
+    } catch (Exception e) {
+      DrillRuntimeException.format(e, "Error during udf area creation [%s] on file system [%s]", fullPath, fs.getUri());
+    }
+    return path;
+  }
+
+  @Override
+  public void close() {
+    try {
+      AutoCloseables.close(
+          fs,
+          registry,
+          unregistration,
+          jars);
+    } catch (Exception e) {
+      logger.warn("Failure on close()", e);
+    }
+  }
+
+  public enum Action {
+    REGISTRATION,
+    UNREGISTRATION
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 44e33cb..ceb1224 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.config.LogicalPlanPersistence;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.sql.DrillOperatorTable;
@@ -222,6 +223,10 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
     return queryContextInfo;
   }
 
+  public RemoteFunctionRegistry getRemoteFunctionRegistry() {
+    return drillbitContext.getRemoteFunctionRegistry();
+  }
+
   @Override
   public ContextInformation getContextInformation() {
     return contextInformation;

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
index 5f489b4..6944a7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
@@ -22,9 +22,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlFunction;
-import org.apache.calcite.sql.SqlPrefixOperator;
 import org.apache.drill.common.expression.FunctionCallFactory;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.DrillFuncHolder;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.calcite.sql.SqlFunctionCategory;
@@ -35,10 +33,10 @@ import org.apache.calcite.sql.SqlSyntax;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.server.options.SystemOptionManager;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Implementation of {@link SqlOperatorTable} that contains standard operators and functions provided through
@@ -54,6 +52,9 @@ public class DrillOperatorTable extends SqlStdOperatorTable {
 
   private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithoutInferenceMap = ArrayListMultimap.create();
   private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithInferenceMap = ArrayListMultimap.create();
+  // indicates local function registry version based on which drill operator were loaded
+  // is used to define if we need to reload operator table in case when function signature was not found
+  private long functionRegistryVersion;
 
   private final OptionManager systemOptionManager;
 
@@ -64,6 +65,23 @@ public class DrillOperatorTable extends SqlStdOperatorTable {
     this.systemOptionManager = systemOptionManager;
   }
 
+  /** Cleans up all operator holders and reloads operators */
+  public void reloadOperators(FunctionImplementationRegistry registry) {
+    drillOperatorsWithoutInference.clear();
+    drillOperatorsWithInference.clear();
+    drillOperatorsWithoutInferenceMap.clear();
+    drillOperatorsWithInferenceMap.clear();
+    registry.register(this);
+  }
+
+  public long setFunctionRegistryVersion(long version) {
+    return functionRegistryVersion = version;
+  }
+
+  public long getFunctionRegistryVersion() {
+    return functionRegistryVersion;
+  }
+
   /**
    * When the option planner.type_inference.enable is turned off, the operators which are added via this method
    * will be used.

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index dbe620d..19123d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -24,6 +24,9 @@ import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.FunctionNotFoundException;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.physical.PhysicalPlan;
@@ -91,7 +94,7 @@ public class DrillSqlWorker {
     }
 
     try {
-      return handler.getPlan(sqlNode);
+      return getPhysicalPlan(handler, sqlNode, context);
     } catch(ValidationException e) {
       String errorMessage = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
       throw UserException.validationError(e)
@@ -108,5 +111,27 @@ public class DrillSqlWorker {
     }
   }
 
+  /**
+   * Returns query physical plan.
+   * In case of {@link FunctionNotFoundException} and dynamic udf support is enabled, attempts to load remote functions.
+   * If at least one function was loaded or local function function registry version has changed,
+   * makes one more attempt to get query physical plan.
+   */
+  private static PhysicalPlan getPhysicalPlan(AbstractSqlHandler handler, SqlNode sqlNode, QueryContext context)
+      throws RelConversionException, IOException, ForemanSetupException, ValidationException {
+    try {
+      return handler.getPlan(sqlNode);
+    } catch (FunctionNotFoundException e) {
+      if (context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
+        DrillOperatorTable drillOperatorTable = context.getDrillOperatorTable();
+        FunctionImplementationRegistry functionRegistry = context.getFunctionRegistry();
+        if (functionRegistry.loadRemoteFunctions(drillOperatorTable.getFunctionRegistryVersion())) {
+          drillOperatorTable.reloadOperators(functionRegistry);
+          return handler.getPlan(sqlNode);
+        }
+      }
+      throw e;
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index 3d0d538..0c3c6a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -54,11 +54,15 @@ import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
 import org.apache.calcite.sql.validate.AggregatingSelectScope;
 import org.apache.calcite.sql.validate.SqlConformance;
 import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
+import org.apache.calcite.sql.validate.SqlValidatorException;
 import org.apache.calcite.sql.validate.SqlValidatorImpl;
 import org.apache.calcite.sql.validate.SqlValidatorScope;
 import org.apache.calcite.sql2rel.RelDecorrelator;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.exception.FunctionNotFoundException;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
@@ -160,6 +164,11 @@ public class SqlConverter {
       SqlNode validatedNode = validator.validate(parsedNode);
       return validatedNode;
     } catch (RuntimeException e) {
+      final Throwable rootCause = ExceptionUtils.getRootCause(e);
+      if (rootCause instanceof SqlValidatorException
+          && StringUtils.contains(rootCause.getMessage(), "No match found for function signature")) {
+        throw new FunctionNotFoundException(rootCause.getMessage(), e);
+      }
       UserException.Builder builder = UserException
           .validationError(e)
           .addContext("SQL Query", sql);

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
new file mode 100644
index 0000000..8515c8a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.FunctionValidationException;
+import org.apache.drill.exec.exception.JarValidationException;
+import org.apache.drill.exec.exception.VersionMismatchException;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.apache.drill.exec.planner.sql.parser.SqlCreateFunction;
+import org.apache.drill.exec.proto.UserBitShared.Jar;
+import org.apache.drill.exec.proto.UserBitShared.Registry;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+import org.apache.drill.exec.util.JarUtil;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+public class CreateFunctionHandler extends DefaultSqlHandler {
+
+  private static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CreateFunctionHandler.class);
+
+  public CreateFunctionHandler(SqlHandlerConfig config) {
+    super(config);
+  }
+
+  /**
+   * Registers UDFs dynamically. Process consists of several steps:
+   * <ol>
+   * <li>Registering jar in jar registry to ensure that several jars with the same name is not registered.</li>
+   * <li>Binary and source jars validation and back up.</li>
+   * <li>Validation against local function registry.</li>
+   * <li>Validation against remote function registry.</li>
+   * <li>Remote function registry update.</li>
+   * <li>Copying of jars to registry area and clean up.</li>
+   * </ol>
+   *
+   * UDFs registration is allowed only if dynamic UDFs support is enabled.
+   *
+   * @return - Single row indicating list of registered UDFs, or error message otherwise.
+   */
+  @Override
+  public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException, IOException {
+    if (!context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
+      throw UserException.validationError()
+          .message("Dynamic UDFs support is disabled.")
+          .build(logger);
+    }
+    RemoteFunctionRegistry remoteRegistry = context.getRemoteFunctionRegistry();
+    JarManager jarManager = new JarManager(sqlNode, remoteRegistry);
+
+    boolean inProgress = false;
+    try {
+      final String action = remoteRegistry.addToJars(jarManager.getBinaryName(), RemoteFunctionRegistry.Action.REGISTRATION);
+      if (!(inProgress = action == null)) {
+        return DirectPlan.createDirectPlan(context, false,
+            String.format("Jar with %s name is used. Action: %s", jarManager.getBinaryName(), action));
+      }
+
+      jarManager.initRemoteBackup();
+      List<String> functions = validateAgainstLocalRegistry(jarManager, context.getFunctionRegistry());
+      initRemoteRegistration(functions, jarManager, remoteRegistry, remoteRegistry.getRetryAttempts());
+      jarManager.deleteQuietlyFromStagingArea();
+
+      return DirectPlan.createDirectPlan(context, true,
+          String.format("The following UDFs in jar %s have been registered:\n%s", jarManager.getBinaryName(), functions));
+
+    } catch (Exception e) {
+      logger.error("Error during UDF registration", e);
+      return DirectPlan.createDirectPlan(context, false, e.getMessage());
+    } finally {
+      if (inProgress) {
+        remoteRegistry.removeFromJars(jarManager.getBinaryName());
+      }
+      jarManager.cleanUp();
+    }
+  }
+
+
+  /**
+   * Instantiates coping of binary to local file system
+   * and validates functions from this jar against local function registry.
+   *
+   * @param jarManager helps coping binary to local file system
+   * @param localFunctionRegistry instance of local function registry to instantiate local validation
+   * @return list of validated function signatures
+   * @throws IOException in case of problems during copying binary to local file system
+   * @throws FunctionValidationException in case duplicated function was found
+   */
+  private List<String> validateAgainstLocalRegistry(JarManager jarManager,
+                                                    FunctionImplementationRegistry localFunctionRegistry) throws IOException {
+    Path localBinary = jarManager.copyBinaryToLocal();
+    return localFunctionRegistry.validate(localBinary);
+  }
+
+  /**
+   * Validates jar and its functions against remote jars.
+   * First checks if there is no duplicate by jar name and then looks for duplicates among functions.
+   *
+   * @param remoteJars list of remote jars to validate against
+   * @param jarName jar name to be validated
+   * @param functions list of functions present in jar to be validated
+   * @throws JarValidationException in case of jar with the same name was found
+   * @throws FunctionValidationException in case duplicated function was found
+   */
+  private void validateAgainstRemoteRegistry(List<Jar> remoteJars, String jarName, List<String> functions) {
+    for (Jar remoteJar : remoteJars) {
+      if (remoteJar.getName().equals(jarName)) {
+        throw new JarValidationException(String.format("Jar with %s name has been already registered", jarName));
+      }
+      for (String remoteFunction : remoteJar.getFunctionSignatureList()) {
+        for (String func : functions) {
+          if (remoteFunction.equals(func)) {
+            throw new FunctionValidationException(
+                String.format("Found duplicated function in %s: %s", remoteJar.getName(), remoteFunction));
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Instantiates remote registration. First gets remote function registry with version.
+   * Version is used to ensure that we update the same registry we validated against.
+   * Then validates against list of remote jars.
+   * If validation is successful, starts updating remote function registry.
+   * If during update {@link VersionMismatchException} was detected,
+   * calls itself recursively to instantiate new remote registration process.
+   * Since remote registry version has changed, we need to re-validate against remote function registry one more time.
+   * Each time recursive call occurs, decreases retry attempts counter by one.
+   * If retry attempts number hits 0, throws exception that failed to update remote function registry.
+   *
+   * @param functions list of functions present in jar
+   * @param jarManager helper class for copying jars to registry area
+   * @param remoteRegistry remote function registry
+   * @param retryAttempts number of retry attempts
+   * @throws IOException in case of problems with copying jars to registry area
+   */
+  private void initRemoteRegistration(List<String> functions,
+                                      JarManager jarManager,
+                                      RemoteFunctionRegistry remoteRegistry,
+                                      int retryAttempts) throws IOException {
+    DataChangeVersion version = new DataChangeVersion();
+    List<Jar> remoteJars = remoteRegistry.getRegistry(version).getJarList();
+    validateAgainstRemoteRegistry(remoteJars, jarManager.getBinaryName(), functions);
+    jarManager.copyToRegistryArea();
+    boolean cleanUp = true;
+    List<Jar> jars = Lists.newArrayList(remoteJars);
+    jars.add(Jar.newBuilder().setName(jarManager.getBinaryName()).addAllFunctionSignature(functions).build());
+    Registry updatedRegistry = Registry.newBuilder().addAllJar(jars).build();
+    try {
+      remoteRegistry.updateRegistry(updatedRegistry, version);
+      cleanUp = false;
+    } catch (VersionMismatchException ex) {
+      if (retryAttempts-- == 0) {
+        throw new DrillRuntimeException("Failed to update remote function registry. Exceeded retry attempts limit.");
+      }
+      initRemoteRegistration(functions, jarManager, remoteRegistry, retryAttempts);
+    } finally {
+      if (cleanUp) {
+        jarManager.deleteQuietlyFromRegistryArea();
+      }
+    }
+  }
+
+  /**
+   * Inner helper class that encapsulates logic for working with jars.
+   * During initialization it creates path to staging jar, local and remote temporary jars, registry jars.
+   * Is responsible for validation, copying and deletion actions.
+   */
+  private class JarManager {
+
+    private final String binaryName;
+    private final FileSystem fs;
+
+    private final Path remoteTmpDir;
+    private final Path localTmpDir;
+
+    private final Path stagingBinary;
+    private final Path stagingSource;
+
+    private final Path tmpRemoteBinary;
+    private final Path tmpRemoteSource;
+
+    private final Path registryBinary;
+    private final Path registrySource;
+
+    JarManager(SqlNode sqlNode, RemoteFunctionRegistry remoteRegistry) throws ForemanSetupException {
+      SqlCreateFunction node = unwrap(sqlNode, SqlCreateFunction.class);
+      this.binaryName = ((SqlCharStringLiteral) node.getJar()).toValue();
+      String sourceName = JarUtil.getSourceName(binaryName);
+
+      this.stagingBinary = new Path(remoteRegistry.getStagingArea(), binaryName);
+      this.stagingSource = new Path(remoteRegistry.getStagingArea(), sourceName);
+
+      this.remoteTmpDir = new Path(remoteRegistry.getTmpArea(), UUID.randomUUID().toString());
+      this.tmpRemoteBinary = new Path(remoteTmpDir, binaryName);
+      this.tmpRemoteSource = new Path(remoteTmpDir, sourceName);
+
+      this.registryBinary = new Path(remoteRegistry.getRegistryArea(), binaryName);
+      this.registrySource = new Path(remoteRegistry.getRegistryArea(), sourceName);
+
+      this.localTmpDir = new Path(Files.createTempDir().toURI());
+      this.fs = remoteRegistry.getFs();
+    }
+
+    /**
+     * @return binary jar name
+     */
+    String getBinaryName() {
+      return binaryName;
+    }
+
+    /**
+     * Validates that both binary and source jar are present in staging area,
+     * it is expected that binary and source have standard naming convention.
+     * Backs up both jars to unique folder in remote temporary area.
+     *
+     * @throws IOException in case of binary or source absence or problems during copying jars
+     */
+    void initRemoteBackup() throws IOException {
+      fs.getFileStatus(stagingBinary);
+      fs.getFileStatus(stagingSource);
+      fs.mkdirs(remoteTmpDir);
+      FileUtil.copy(fs, stagingBinary, fs, tmpRemoteBinary, false, true, fs.getConf());
+      FileUtil.copy(fs, stagingSource, fs, tmpRemoteSource, false, true, fs.getConf());
+    }
+
+    /**
+     * Copies binary jar to unique folder on local file system.
+     * Source jar is not needed for local validation.
+     *
+     * @return path to local binary jar
+     * @throws IOException in case of problems during copying binary jar
+     */
+    Path copyBinaryToLocal() throws IOException {
+      Path localBinary = new Path(localTmpDir, binaryName);
+      fs.copyToLocalFile(tmpRemoteBinary, localBinary);
+      return localBinary;
+    }
+
+    /**
+     * Copies binary and source jars to registry area,
+     * in case of {@link IOException} removes copied jar(-s) from registry area
+     *
+     * @throws IOException is re-thrown in case of problems during copying process
+     */
+    void copyToRegistryArea() throws IOException {
+      FileUtil.copy(fs, tmpRemoteBinary, fs, registryBinary, false, true, fs.getConf());
+      try {
+        FileUtil.copy(fs, tmpRemoteSource, fs, registrySource, false, true, fs.getConf());
+      } catch (IOException e) {
+        deleteQuietly(registryBinary, false);
+        throw new IOException(e);
+      }
+    }
+
+    /**
+     * Deletes binary and sources jars from staging area, in case of problems, logs warning and proceeds.
+     */
+    void deleteQuietlyFromStagingArea() {
+      deleteQuietly(stagingBinary, false);
+      deleteQuietly(stagingSource, false);
+    }
+
+    /**
+     * Deletes binary and sources jars from registry area, in case of problems, logs warning and proceeds.
+     */
+    void deleteQuietlyFromRegistryArea() {
+      deleteQuietly(registryBinary, false);
+      deleteQuietly(registrySource, false);
+    }
+
+    /**
+     * Removes quietly remote and local unique folders in temporary directories.
+     */
+    void cleanUp() {
+      FileUtils.deleteQuietly(new File(localTmpDir.toUri()));
+      deleteQuietly(remoteTmpDir, true);
+    }
+
+    /**
+     * Deletes quietly file or directory, in case of errors, logs warning and proceeds.
+     *
+     * @param path path to file or directory
+     * @param isDirectory set to true if we need to delete a directory
+     */
+    private void deleteQuietly(Path path, boolean isDirectory) {
+      try {
+        fs.delete(path, isDirectory);
+      } catch (IOException e) {
+        logger.warn(String.format("Error during deletion [%s]", path.toUri().getPath()), e);
+      }
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
new file mode 100644
index 0000000..5269a4b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.VersionMismatchException;
+import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.apache.drill.exec.planner.sql.parser.SqlDropFunction;
+import org.apache.drill.exec.proto.UserBitShared.Jar;
+import org.apache.drill.exec.proto.UserBitShared.Registry;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+import org.apache.drill.exec.util.JarUtil;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class DropFunctionHandler extends DefaultSqlHandler {
+
+  private static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DropFunctionHandler.class);
+
+  public DropFunctionHandler(SqlHandlerConfig config) {
+    super(config);
+  }
+
+  /**
+   * Unregisters UDFs dynamically. Process consists of several steps:
+   * <ol>
+   * <li>Registering jar in jar registry to ensure that several jars with the same name is not being unregistered.</li>
+   * <li>Starts remote unregistration process, gets list of all jars and excludes jar to be deleted.</li>
+   * <li>Signals drill bits to start local unregistration process.</li>
+   * <li>Removes source and binary jars from registry area.</li>
+   * </ol>
+   *
+   * UDFs unregistration is allowed only if dynamic UDFs support is enabled.
+   * Only jars registered dynamically can be unregistered,
+   * built-in functions loaded at start up are not allowed to be unregistered.
+   *
+   * Limitation: before jar unregistration make sure no one is using functions from this jar.
+   * There is no guarantee that running queries will finish successfully or give correct result.
+   *
+   * @return - Single row indicating list of unregistered UDFs, raise exception otherwise
+   */
+  @Override
+  public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException, IOException {
+    if (!context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
+      throw UserException.validationError()
+          .message("Dynamic UDFs support is disabled.")
+          .build(logger);
+    }
+
+    SqlDropFunction node = unwrap(sqlNode, SqlDropFunction.class);
+    String jarName = ((SqlCharStringLiteral) node.getJar()).toValue();
+    RemoteFunctionRegistry remoteFunctionRegistry = context.getRemoteFunctionRegistry();
+
+    boolean inProgress = false;
+    try {
+      final String action = remoteFunctionRegistry.addToJars(jarName, RemoteFunctionRegistry.Action.UNREGISTRATION);
+      if (!(inProgress = action == null)) {
+        return DirectPlan.createDirectPlan(context, false, String.format("Jar with %s name is used. Action: %s", jarName, action));
+      }
+
+      Jar deletedJar = unregister(jarName, remoteFunctionRegistry, remoteFunctionRegistry.getRetryAttempts());
+      if (deletedJar == null) {
+        return DirectPlan.createDirectPlan(context, false, String.format("Jar %s is not registered in remote registry", jarName));
+      }
+      remoteFunctionRegistry.submitForUnregistration(jarName);
+
+      removeJarFromArea(jarName, remoteFunctionRegistry.getFs(), remoteFunctionRegistry.getRegistryArea());
+      removeJarFromArea(JarUtil.getSourceName(jarName), remoteFunctionRegistry.getFs(), remoteFunctionRegistry.getRegistryArea());
+
+      return DirectPlan.createDirectPlan(context, true,
+          String.format("The following UDFs in jar %s have been unregistered:\n%s", jarName, deletedJar.getFunctionSignatureList()));
+
+    } catch (Exception e) {
+      logger.error("Error during UDF unregistration", e);
+      return DirectPlan.createDirectPlan(context, false, e.getMessage());
+    } finally {
+      if (inProgress) {
+        remoteFunctionRegistry.finishUnregistration(jarName);
+        remoteFunctionRegistry.removeFromJars(jarName);
+      }
+    }
+  }
+
+  /**
+   * First gets remote function registry with version.
+   * Version is used to ensure that we update the same registry we removed jars from.
+   * Looks for a jar to be deleted, if founds one,
+   * attempts to update remote registry with updated list of jars, that excludes jar to be deleted.
+   * If during update {@link VersionMismatchException} was detected,
+   * calls itself recursively to instantiate new remote unregistration process.
+   * Since remote registry version has changed we need to look for jar to be deleted one more time.
+   * Each time recursive call occurs, decreases retry attempts counter by one.
+   * If retry attempts number hits 0, throws exception that failed to update remote function registry.
+   *
+   * @param jarName jar name
+   * @param remoteFunctionRegistry remote function registry
+   * @param retryAttempts number of retry attempts
+   * @return jar that was unregistered, null otherwise
+   */
+  private Jar unregister(String jarName, RemoteFunctionRegistry remoteFunctionRegistry, int retryAttempts) {
+    DataChangeVersion version = new DataChangeVersion();
+    Registry registry = remoteFunctionRegistry.getRegistry(version);
+    Jar jarToBeDeleted = null;
+    List<Jar> jars = Lists.newArrayList();
+    for (Jar j : registry.getJarList()) {
+      if (j.getName().equals(jarName)) {
+        jarToBeDeleted = j;
+      } else {
+        jars.add(j);
+      }
+    }
+    if (jarToBeDeleted != null) {
+      Registry updatedRegistry = Registry.newBuilder().addAllJar(jars).build();
+      try {
+        remoteFunctionRegistry.updateRegistry(updatedRegistry, version);
+      } catch (VersionMismatchException ex) {
+        if (retryAttempts-- == 0) {
+          throw new DrillRuntimeException("Failed to update remote function registry. Exceeded retry attempts limit.");
+        }
+        unregister(jarName, remoteFunctionRegistry, retryAttempts);
+      }
+    }
+    return jarToBeDeleted;
+  }
+
+  /**
+   * Removes jar from indicated area, in case of error log it and proceeds.
+   *
+   * @param jarName jar name
+   * @param fs file system
+   * @param area path to area
+   */
+  private void removeJarFromArea(String jarName, FileSystem fs, Path area) {
+    try {
+      fs.delete(new Path(area, jarName), false);
+    } catch (IOException e) {
+      logger.error("Error removing jar {} from area {}", jarName, area.toUri().getPath());
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
index fa0d319..53e3cd5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
@@ -174,6 +174,8 @@ public class CompoundIdentifierConverter extends SqlShuttle {
     rules.put(SqlRefreshMetadata.class, R(D));
     rules.put(SqlSetOption.class, R(D, D, D));
     rules.put(SqlDescribeSchema.class, R(D));
+    rules.put(SqlCreateFunction.class, R(D));
+    rules.put(SqlDropFunction.class, R(D));
     REWRITE_RULES = ImmutableMap.copyOf(rules);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateFunction.java
new file mode 100644
index 0000000..c14f468
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateFunction.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.parser;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.CreateFunctionHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
+
+import java.util.List;
+
+public class SqlCreateFunction extends DrillSqlCall {
+
+  private final SqlNode jar;
+
+  public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_FUNCTION", SqlKind.OTHER) {
+    @Override
+    public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
+      return new SqlCreateFunction(pos, operands[0]);
+    }
+  };
+
+  public SqlCreateFunction(SqlParserPos pos, SqlNode jar) {
+    super(pos);
+    this.jar = jar;
+  }
+
+  @Override
+  public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override
+  public List<SqlNode> getOperandList() {
+    List<SqlNode> opList = Lists.newArrayList();
+    opList.add(jar);
+    return opList;
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+    writer.keyword("CREATE");
+    writer.keyword("FUNCTION");
+    writer.keyword("USING");
+    writer.keyword("JAR");
+    jar.unparse(writer, leftPrec, rightPrec);
+  }
+
+  @Override
+  public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+    return new CreateFunctionHandler(config);
+  }
+
+  public SqlNode getJar() { return jar; }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropFunction.java
new file mode 100644
index 0000000..77d2b76
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropFunction.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.parser;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.DropFunctionHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
+
+import java.util.List;
+
+public class SqlDropFunction extends DrillSqlCall {
+
+  private final SqlNode jar;
+
+  public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("DROP_FUNCTION", SqlKind.OTHER) {
+    @Override
+    public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
+      return new SqlDropFunction(pos, operands[0]);
+    }
+  };
+
+  public SqlDropFunction(SqlParserPos pos, SqlNode jar) {
+    super(pos);
+    this.jar = jar;
+  }
+
+  @Override
+  public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override
+  public List<SqlNode> getOperandList() {
+    List<SqlNode> opList = Lists.newArrayList();
+    opList.add(jar);
+    return opList;
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+    writer.keyword("DROP");
+    writer.keyword("FUNCTION");
+    writer.keyword("USING");
+    writer.keyword("JAR");
+    jar.unparse(writer, leftPrec, rightPrec);
+  }
+
+  @Override
+  public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+    return new DropFunctionHandler(config);
+  }
+
+  public SqlNode getJar() { return jar; }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
index bd7c779..3eed022 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.server.options.TypeValidators;
+import org.apache.drill.exec.server.options.TypeValidators.StringValidator;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -84,10 +84,10 @@ public class InboundImpersonationManager {
   /**
    * Validator for impersonation policies.
    */
-  public static class InboundImpersonationPolicyValidator extends TypeValidators.AdminOptionValidator {
+  public static class InboundImpersonationPolicyValidator extends StringValidator {
 
     public InboundImpersonationPolicyValidator(String name, String def) {
-      super(name, def);
+      super(name, def, true);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 55a2b05..3f74268 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -123,6 +123,7 @@ public class Drillbit implements AutoCloseable {
     storageRegistry.init();
     drillbitContext.getOptionManager().init();
     javaPropertiesToSystemOptions();
+    manager.getContext().getRemoteFunctionRegistry().init(context.getConfig(), storeProvider, coord);
     registrationHandle = coord.register(md);
     webServer.start();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 1af6d11..3eb87ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -29,6 +29,7 @@ import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.exec.compile.CodeCompiler;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -180,8 +181,12 @@ public class DrillbitContext implements AutoCloseable {
     return classpathScan;
   }
 
+  public RemoteFunctionRegistry getRemoteFunctionRegistry() { return functionRegistry.getRemoteFunctionRegistry(); }
+
   @Override
   public void close() throws Exception {
     getOptionManager().close();
+    getFunctionImplementationRegistry().close();
+    getRemoteFunctionRegistry().close();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
index db42603..82f4ab9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
@@ -26,9 +26,16 @@ public abstract class OptionValidator {
   // Stored here as well as in the option static class to allow insertion of option optionName into
   // the error messages produced by the validator
   private final String optionName;
+  private final boolean isAdminOption;
 
+  /** By default, if admin option value is not specified, it would be set to false.*/
   public OptionValidator(String optionName) {
+    this(optionName, false);
+  }
+
+  public OptionValidator(String optionName, boolean isAdminOption) {
     this.optionName = optionName;
+    this.isAdminOption = isAdminOption;
   }
 
   /**
@@ -69,6 +76,13 @@ public abstract class OptionValidator {
   }
 
   /**
+   * @return true is option is system-level property that can be only specified by admin (not user).
+   */
+  public boolean isAdminOption() {
+    return isAdminOption;
+  }
+
+  /**
    * Gets the default option value for this validator.
    *
    * @return default option value

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 115ea47..ee94493 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -147,7 +147,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       ExecConstants.IMPLICIT_FQN_COLUMN_LABEL_VALIDATOR,
       ExecConstants.IMPLICIT_FILEPATH_COLUMN_LABEL_VALIDATOR,
       ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE_VALIDATOR,
-      ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR
+      ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR,
+      ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR
     };
     final Map<String, OptionValidator> tmp = new HashMap<>();
     for (final OptionValidator validator : validators) {

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
index d015040..b4074ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -141,25 +141,41 @@ public class TypeValidators {
 
   public static class BooleanValidator extends TypeValidator {
     public BooleanValidator(String name, boolean def) {
-      super(name, Kind.BOOLEAN, OptionValue.createBoolean(OptionType.SYSTEM, name, def));
+      this(name, def, false);
+    }
+
+    public BooleanValidator(String name, boolean def, boolean isAdminOption) {
+      super(name, Kind.BOOLEAN, OptionValue.createBoolean(OptionType.SYSTEM, name, def), isAdminOption);
     }
   }
 
   public static class StringValidator extends TypeValidator {
     public StringValidator(String name, String def) {
-      super(name, Kind.STRING, OptionValue.createString(OptionType.SYSTEM, name, def));
+      this(name, def, false);
+    }
+
+    public StringValidator(String name, String def, boolean isAdminOption) {
+      super(name, Kind.STRING, OptionValue.createString(OptionType.SYSTEM, name, def), isAdminOption);
     }
   }
 
   public static class LongValidator extends TypeValidator {
     public LongValidator(String name, long def) {
-      super(name, Kind.LONG, OptionValue.createLong(OptionType.SYSTEM, name, def));
+      this(name, def, false);
+    }
+
+    public LongValidator(String name, long def, boolean isAdminOption) {
+      super(name, Kind.LONG, OptionValue.createLong(OptionType.SYSTEM, name, def), isAdminOption);
     }
   }
 
   public static class DoubleValidator extends TypeValidator {
     public DoubleValidator(String name, double def) {
-      super(name, Kind.DOUBLE, OptionValue.createDouble(OptionType.SYSTEM, name, def));
+      this(name, def, false);
+    }
+
+    public DoubleValidator(String name, double def, boolean isAdminOption) {
+      super(name, Kind.DOUBLE, OptionValue.createDouble(OptionType.SYSTEM, name, def), isAdminOption);
     }
   }
 
@@ -184,22 +200,6 @@ public class TypeValidators {
     }
   }
 
-  public static class AdminOptionValidator extends StringValidator {
-    public AdminOptionValidator(String name, String def) {
-      super(name, def);
-    }
-
-    @Override
-    public void validate(final OptionValue v, final OptionManager manager) {
-      if (v.type != OptionType.SYSTEM) {
-        throw UserException.validationError()
-            .message("Admin related settings can only be set at SYSTEM level scope. Given scope '%s'.", v.type)
-            .build(logger);
-      }
-      super.validate(v, manager);
-    }
-  }
-
   /**
    * Validator that checks if the given value is included in a list of acceptable values. Case insensitive.
    */
@@ -229,7 +229,11 @@ public class TypeValidators {
     private final OptionValue defaultValue;
 
     public TypeValidator(final String name, final Kind kind, final OptionValue defValue) {
-      super(name);
+      this(name, kind, defValue, false);
+    }
+
+    public TypeValidator(final String name, final Kind kind, final OptionValue defValue, final boolean isAdminOption) {
+      super(name, isAdminOption);
       checkArgument(defValue.type == OptionType.SYSTEM, "Default value must be SYSTEM type.");
       this.kind = kind;
       this.defaultValue = defValue;
@@ -248,6 +252,11 @@ public class TypeValidators {
               kind.name(), v.kind.name()))
             .build(logger);
       }
+      if (isAdminOption() && v.type != OptionType.SYSTEM) {
+        throw UserException.validationError()
+            .message("Admin related settings can only be set at SYSTEM level scope. Given scope '%s'.", v.type)
+            .build(logger);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
index 248c3cb..ea38278 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
@@ -17,11 +17,11 @@
  */
 package org.apache.drill.exec.store.sys;
 
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+
 import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.drill.common.collections.ImmutableEntry;
-
 public abstract class BasePersistentStore<V> implements PersistentStore<V> {
 
   @Override
@@ -29,4 +29,18 @@ public abstract class BasePersistentStore<V> implements PersistentStore<V> {
     return getRange(0, Integer.MAX_VALUE);
   }
 
+  /** By default get with version will behave the same way as without version.
+   * Override this method to add version support. */
+  @Override
+  public V get(String key, DataChangeVersion version) {
+    return get(key);
+  }
+
+  /** By default put with version will behave the same way as without version.
+   * Override this method to add version support. */
+  @Override
+  public void put(String key, V value, DataChangeVersion version) {
+    put(key, value);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
index 767b1d5..bb23752 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.store.sys;
 
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+
 import java.util.Iterator;
 import java.util.Map;
 
@@ -38,6 +40,14 @@ public interface PersistentStore<V> extends AutoCloseable {
   V get(String key);
 
   /**
+   * Returns the value for the given key if exists, null otherwise.
+   * Sets data change version number.
+   * @param key  lookup key
+   * @param version version holder
+   */
+  V get(String key, DataChangeVersion version);
+
+  /**
    * Stores the (key, value) tuple in the store. Lifetime of the tuple depends upon store {@link #getMode mode}.
    *
    * @param key  lookup key
@@ -45,6 +55,17 @@ public interface PersistentStore<V> extends AutoCloseable {
    */
   void put(String key, V value);
 
+  /**
+   * Stores the (key, value) tuple in the store.
+   * If tuple already exits, stores it only if versions match,
+   * otherwise throws {@link org.apache.drill.exec.exception.VersionMismatchException}
+   * Lifetime of the tuple depends upon store {@link #getMode mode}.
+   *
+   * @param key  lookup key
+   * @param value  value to store
+   * @param version version holder
+   */
+  void put(String key, V value, DataChangeVersion version);
 
   /**
    * Removes the value corresponding to the given key if exists, nothing happens otherwise.

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
new file mode 100644
index 0000000..10c1b8f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store;
+
+public class DataChangeVersion {
+
+  private int version;
+
+  public void setVersion(int version) {
+    this.version = version;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
index 3dde4b8..55f72c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
@@ -63,7 +63,17 @@ public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> {
 
   @Override
   public V get(final String key) {
-    final byte[] bytes = client.get(key);
+    return get(key, false, null);
+  }
+
+  @Override
+  public V get(final String key, DataChangeVersion version) {
+    return get(key, true, version);
+  }
+
+  public V get(final String key, boolean consistencyFlag, DataChangeVersion version) {
+    byte[] bytes = client.get(key, consistencyFlag, version);
+
     if (bytes == null) {
       return null;
     }
@@ -76,28 +86,30 @@ public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> {
 
   @Override
   public void put(final String key, final V value) {
+    put(key, value, null);
+  }
+
+  @Override
+  public void put(final String key, final V value, DataChangeVersion version) {
     final InstanceSerializer<V> serializer = config.getSerializer();
     try {
       final byte[] bytes = serializer.serialize(value);
-      client.put(key, bytes);
+      client.put(key, bytes, version);
     } catch (final IOException e) {
       throw new DrillRuntimeException(String.format("unable to de/serialize value of type %s", value.getClass()), e);
     }
   }
 
+
   @Override
   public boolean putIfAbsent(final String key, final V value) {
-    final V old = get(key);
-    if (old == null) {
-      try {
-        final byte[] bytes = config.getSerializer().serialize(value);
-        client.put(key, bytes);
-        return true;
-      } catch (final IOException e) {
-        throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e);
-      }
+    try {
+      final byte[] bytes = config.getSerializer().serialize(value);
+      final byte[] data = client.putIfAbsent(key, bytes);
+      return data == null;
+    } catch (final IOException e) {
+      throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e);
     }
-    return false;
   }
 
   @Override


Mime
View raw message