Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 51130200B8E for ; Sun, 11 Sep 2016 19:16:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4FBAE160AB6; Sun, 11 Sep 2016 17:16:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 75000160AC7 for ; Sun, 11 Sep 2016 19:16:21 +0200 (CEST) Received: (qmail 90446 invoked by uid 500); 11 Sep 2016 17:16:20 -0000 Mailing-List: contact issues-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@drill.apache.org Delivered-To: mailing list issues@drill.apache.org Received: (qmail 90282 invoked by uid 99); 11 Sep 2016 17:16:20 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 11 Sep 2016 17:16:20 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 7DD7E2C1B79 for ; Sun, 11 Sep 2016 17:16:20 +0000 (UTC) Date: Sun, 11 Sep 2016 17:16:20 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@drill.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (DRILL-4726) Dynamic UDFs support MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sun, 11 Sep 2016 17:16:22 -0000 [ https://issues.apache.org/jira/browse/DRILL-4726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15482086#comment-15482086 ] ASF GitHub Bot commented on DRILL-4726: --------------------------------------- Github user arina-ielchiieva commented on a diff in the pull request: https://github.com/apache/drill/pull/574#discussion_r78298762 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java --- @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.handlers; + +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.commons.io.FileUtils; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.FunctionValidationException; +import org.apache.drill.exec.exception.VersionMismatchException; +import org.apache.drill.exec.expr.fn.RemoteFunctionRegistry; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.planner.sql.DirectPlan; +import org.apache.drill.exec.planner.sql.parser.SqlCreateFunction; +import org.apache.drill.exec.proto.UserBitShared.Func; +import org.apache.drill.exec.proto.UserBitShared.Jar; +import org.apache.drill.exec.proto.UserBitShared.Registry; +import org.apache.drill.exec.store.sys.store.DataChangeVersion; +import org.apache.drill.exec.util.JarUtil; +import org.apache.drill.exec.work.foreman.ForemanSetupException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +public class CreateFunctionHandler extends DefaultSqlHandler { + + private static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CreateFunctionHandler.class); + + public CreateFunctionHandler(SqlHandlerConfig config) { + super(config); + } + + /** + * Creates UDFs dynamically. + * + * @return - Single row indicating list of registered UDFs, raise exception otherwise + */ + @Override + public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException, IOException { + if (context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) { + SqlCreateFunction node = unwrap(sqlNode, SqlCreateFunction.class); + String jarName = ((SqlCharStringLiteral) node.getJar()).toValue(); + String sourceName = JarUtil.getSourceName(jarName); + + RemoteFunctionRegistry remoteRegistry = context.getRemoteFunctionRegistry(); + FileSystem fs = remoteRegistry.getFs(); + Path tmpDir = new Path(remoteRegistry.getTmpArea(), UUID.randomUUID().toString()); + File localTmpDir = Files.createTempDir(); + + boolean inProgress = false; + try { + final String action = remoteRegistry.addToJars(jarName, RemoteFunctionRegistry.Action.REGISTRATION); + if (!(inProgress = action == null)) { + return DirectPlan.createDirectPlan(context, false, + String.format("Jar with %s name is used. Action: %s", jarName, action)); + } + + // verify that binary and source exist + Path remoteBinary = new Path(remoteRegistry.getStagingArea(), jarName); + Path remoteSource = new Path(remoteRegistry.getStagingArea(), sourceName); + if (!fs.exists(remoteBinary) || !fs.exists(remoteSource)) { + return DirectPlan.createDirectPlan(context, false, + String.format("Binary [%s] or source [%s] is absent in udf staging area [%s].", jarName, sourceName, remoteRegistry.getStagingArea().toUri().getPath())); + } + + // backup binary & source (copy to udf tmp directory) + fs.mkdirs(tmpDir); + Path tmpBinary = new Path(tmpDir, jarName); + Path tmpSource = new Path(tmpDir, sourceName); + + FileUtil.copy(fs, remoteBinary, fs, tmpBinary, false, fs.getConf()); + FileUtil.copy(fs, remoteSource, fs, tmpSource, false, fs.getConf()); + + // copy binary to local fs, we don't need source for validation + Path localBinary = new Path(new Path(localTmpDir.toURI()), jarName); + fs.copyToLocalFile(tmpBinary, localBinary); + + // validate functions locally + List functions; + try { + functions = context.getFunctionRegistry().validate(localBinary); + } catch (FunctionValidationException ex) { + return DirectPlan.createDirectPlan(context, false, ex.getMessage()); + } + + if (functions.size() == 0) { + return DirectPlan.createDirectPlan(context, false, + String.format("Jar %s does not contain functions", jarName)); + } + + // validate and register remotely + Jar jar = Jar.newBuilder().setName(jarName).addAllFunction(functions).build(); + String error = register(remoteRegistry, jar, tmpBinary, tmpSource, remoteRegistry.getRetryTimes()); + + if (error != null) { + return DirectPlan.createDirectPlan(context, false, error); + } + + // remove jars from staging area + try { + fs.delete(remoteBinary, false); + fs.delete(remoteSource, false); + } catch (IOException ex) { + logger.warn("Failed to delete binary {} and source {} from staging area", jarName, sourceName); + } + + return DirectPlan.createDirectPlan(context, true, + String.format("The following UDFs in jar %s have been registered:\n%s", jarName, functions)); + + } finally { + if (inProgress) { + remoteRegistry.removeFromJars(jarName); + } + FileUtils.deleteQuietly(localTmpDir); + if (fs.exists(tmpDir)) { + fs.delete(tmpDir, true); + } + } + } + throw UserException.validationError() + .message("Dynamic UDFs support is disabled.") + .build(logger); + } + + private String validate(Registry registry, Jar jar) { + for (Jar remoteJar : registry.getJarList()) { + if (remoteJar.getName().equals(jar.getName())) { + return String.format("Jar with %s name has been already registered", jar.getName()); + } + for (Func remoteFunction : remoteJar.getFunctionList()) { + for (Func func : jar.getFunctionList()) { + if (remoteFunction.getName().equals(func.getName()) && remoteFunction.getMajorTypeList().equals(func.getMajorTypeList())) { + return String.format("Found duplicated function in %s - %s", remoteJar.getName(), remoteFunction); + } + } + } + } + return null; + } + + private String register(RemoteFunctionRegistry remoteFunctionRegistry, Jar jar, Path tmpBinary, Path tmpSource, int retryTimes) throws IOException { + DataChangeVersion version = new DataChangeVersion(); + Registry registry = remoteFunctionRegistry.getRegistry(version); + // validate against remote registry + String error = validate(registry, jar); + if (error == null) { + // copy jars to registry area + FileSystem fs = remoteFunctionRegistry.getFs(); + Path binary = new Path(remoteFunctionRegistry.getRegistryArea(), tmpBinary.getName()); + Path source = new Path(remoteFunctionRegistry.getRegistryArea(), tmpSource.getName()); + FileUtil.copy(fs, tmpBinary, fs, binary, false, true, fs.getConf()); + FileUtil.copy(fs, tmpSource, fs, source, false, true, fs.getConf()); + + // add jar info into remote registry if all validation has passed + List remoteJars = Lists.newArrayList(registry.getJarList()); + remoteJars.add(jar); + Registry updatedRegistry = Registry.newBuilder().addAllJar(remoteJars).build(); + try { + remoteFunctionRegistry.updateRegistry(updatedRegistry, version); + } catch (VersionMismatchException ex) { --- End diff -- I'll add description. It's how we detect that remote function registry version has changed. > Dynamic UDFs support > -------------------- > > Key: DRILL-4726 > URL: https://issues.apache.org/jira/browse/DRILL-4726 > Project: Apache Drill > Issue Type: New Feature > Affects Versions: 1.6.0 > Reporter: Arina Ielchiieva > Assignee: Arina Ielchiieva > Fix For: Future > > > Allow register UDFs without restart of Drillbits. > Design is described in document below: > https://docs.google.com/document/d/1FfyJtWae5TLuyheHCfldYUpCdeIezR2RlNsrOTYyAB4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332)