Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2244D1816B for ; Fri, 7 Aug 2015 15:45:51 +0000 (UTC) Received: (qmail 57075 invoked by uid 500); 7 Aug 2015 15:45:35 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 56958 invoked by uid 500); 7 Aug 2015 15:45:35 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 56264 invoked by uid 99); 7 Aug 2015 15:45:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Aug 2015 15:45:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 29ED4E7155; Fri, 7 Aug 2015 15:45:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: samt@apache.org To: commits@cassandra.apache.org Date: Fri, 07 Aug 2015 15:45:46 -0000 Message-Id: In-Reply-To: <9984313fa17644bca62b6bcf1297690f@git.apache.org> References: <9984313fa17644bca62b6bcf1297690f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/15] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0 Merge branch 'cassandra-2.2' into cassandra-3.0 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65c695c9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65c695c9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65c695c9 Branch: refs/heads/cassandra-3.0 Commit: 65c695c9186afb80a2c4d6b904b66524ebbe0754 Parents: 6818ba9 39c7869 Author: Sam Tunnicliffe Authored: Fri Aug 7 16:12:55 2015 +0100 Committer: Sam Tunnicliffe Committed: Fri Aug 7 16:12:55 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../JMXEnabledScheduledThreadPoolExecutor.java | 137 ------------------- ...EnabledScheduledThreadPoolExecutorMBean.java | 26 ---- .../JMXEnabledThreadPoolExecutor.java | 5 + .../cql3/functions/JavaBasedUDFunction.java | 21 ++- .../cql3/functions/ScriptBasedUDFunction.java | 17 +-- .../cassandra/db/HintedHandOffManager.java | 26 +++- 7 files changed, 41 insertions(+), 192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index caf6fb4,98c8d73..13614cc --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -68,8 -30,9 +68,9 @@@ Merged from 2.1 * Handle corrupt files on startup (CASSANDRA-9686) * Fix clientutil jar and tests (CASSANDRA-9760) * (cqlsh) Allow the SSL protocol version to be specified through the - config file or environment variables (CASSANDRA-9544) + config file or environment variables (CASSANDRA-9544) Merged from 2.0: + * Remove erroneous pending HH tasks from tpstats/jmx (CASSANDRA-9129) * Don't cast expected bf size to an int (CASSANDRA-9959) * checkForEndpointCollision fails for legitimate collisions (CASSANDRA-9765) * Complete CASSANDRA-8448 fix (CASSANDRA-9519) http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java index 2b86701,2b86701..a7a54f2 --- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java @@@ -53,6 -53,6 +53,11 @@@ public class JMXEnabledThreadPoolExecut this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory(threadPoolName, priority), "internal"); } ++ public JMXEnabledThreadPoolExecutor(NamedThreadFactory threadFactory, String jmxPath) ++ { ++ this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue(), threadFactory, jmxPath); ++ } ++ public JMXEnabledThreadPoolExecutor(int corePoolSize, long keepAliveTime, TimeUnit unit, http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java index e066581,0000000..3d8ebd9 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java +++ b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java @@@ -1,645 -1,0 +1,640 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3.functions; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.lang.reflect.InvocationTargetException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandler; +import java.nio.ByteBuffer; +import java.security.CodeSource; +import java.security.PermissionCollection; +import java.security.ProtectionDomain; +import java.security.SecureClassLoader; +import java.security.cert.Certificate; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; - import java.util.concurrent.ConcurrentHashMap; - import java.util.concurrent.ExecutorService; - import java.util.concurrent.ThreadLocalRandom; ++import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.io.ByteStreams; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.DataType; - import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor; ++import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; - import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.utils.FBUtilities; +import org.eclipse.jdt.core.compiler.IProblem; +import org.eclipse.jdt.internal.compiler.*; +import org.eclipse.jdt.internal.compiler.Compiler; +import org.eclipse.jdt.internal.compiler.classfmt.ClassFileReader; +import org.eclipse.jdt.internal.compiler.classfmt.ClassFormatException; +import org.eclipse.jdt.internal.compiler.env.ICompilationUnit; +import org.eclipse.jdt.internal.compiler.env.INameEnvironment; +import org.eclipse.jdt.internal.compiler.env.NameEnvironmentAnswer; +import org.eclipse.jdt.internal.compiler.impl.CompilerOptions; +import org.eclipse.jdt.internal.compiler.problem.DefaultProblemFactory; + +final class JavaBasedUDFunction extends UDFunction +{ + private static final String BASE_PACKAGE = "org.apache.cassandra.cql3.udf.gen"; + + static final Logger logger = LoggerFactory.getLogger(JavaBasedUDFunction.class); + + private static final AtomicInteger classSequence = new AtomicInteger(); + - private static final JMXEnabledScheduledThreadPoolExecutor executor = - new JMXEnabledScheduledThreadPoolExecutor( - DatabaseDescriptor.getMaxHintsThread(), - new NamedThreadFactory("UserDefinedFunctions", - Thread.MIN_PRIORITY, - udfClassLoader, - new SecurityThreadGroup("UserDefinedFunctions", null)), - "userfunction"); ++ private static final JMXEnabledThreadPoolExecutor executor = ++ new JMXEnabledThreadPoolExecutor(new NamedThreadFactory("UserDefinedFunctions", ++ Thread.MIN_PRIORITY, ++ udfClassLoader, ++ new SecurityThreadGroup("UserDefinedFunctions", null)), ++ "userfunction"); + + private static final EcjTargetClassLoader targetClassLoader = new EcjTargetClassLoader(); + + private static final UDFByteCodeVerifier udfByteCodeVerifier = new UDFByteCodeVerifier(); + + private static final ProtectionDomain protectionDomain; + + private static final IErrorHandlingPolicy errorHandlingPolicy = DefaultErrorHandlingPolicies.proceedWithAllProblems(); + private static final IProblemFactory problemFactory = new DefaultProblemFactory(Locale.ENGLISH); + private static final CompilerOptions compilerOptions; + + /** + * Poor man's template - just a text file splitted at '#' chars. + * Each string at an even index is a constant string (just copied), + * each string at an odd index is an 'instruction'. + */ + private static final String[] javaSourceTemplate; + + static + { + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "forName"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getClassLoader"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getResource"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getResourceAsStream"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "clearAssertionStatus"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResource"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResourceAsStream"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResources"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemClassLoader"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResource"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResourceAsStream"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResources"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "loadClass"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setClassAssertionStatus"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setDefaultAssertionStatus"); + udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setPackageAssertionStatus"); + udfByteCodeVerifier.addDisallowedMethodCall("java/nio/ByteBuffer", "allocateDirect"); + + Map settings = new HashMap<>(); + settings.put(CompilerOptions.OPTION_LineNumberAttribute, + CompilerOptions.GENERATE); + settings.put(CompilerOptions.OPTION_SourceFileAttribute, + CompilerOptions.DISABLED); + settings.put(CompilerOptions.OPTION_ReportDeprecation, + CompilerOptions.IGNORE); + settings.put(CompilerOptions.OPTION_Source, + CompilerOptions.VERSION_1_8); + settings.put(CompilerOptions.OPTION_TargetPlatform, + CompilerOptions.VERSION_1_8); + + compilerOptions = new CompilerOptions(settings); + compilerOptions.parseLiteralExpressionsAsConstants = true; + + try (InputStream input = JavaBasedUDFunction.class.getResource("JavaSourceUDF.txt").openConnection().getInputStream()) + { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + FBUtilities.copy(input, output, Long.MAX_VALUE); + String template = output.toString(); + + StringTokenizer st = new StringTokenizer(template, "#"); + javaSourceTemplate = new String[st.countTokens()]; + for (int i = 0; st.hasMoreElements(); i++) + javaSourceTemplate[i] = st.nextToken(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + CodeSource codeSource; + try + { + codeSource = new CodeSource(new URL("udf", "localhost", 0, "/java", new URLStreamHandler() + { + protected URLConnection openConnection(URL u) + { + return null; + } + }), (Certificate[])null); + } + catch (MalformedURLException e) + { + throw new RuntimeException(e); + } + + protectionDomain = new ProtectionDomain(codeSource, ThreadAwareSecurityManager.noPermissions, targetClassLoader, null); + } + + private final JavaUDF javaUDF; + + JavaBasedUDFunction(FunctionName name, List argNames, List> argTypes, + AbstractType returnType, boolean calledOnNullInput, String body) + { + super(name, argNames, argTypes, UDHelper.driverTypes(argTypes), + returnType, UDHelper.driverType(returnType), calledOnNullInput, "java", body); + + // javaParamTypes is just the Java representation for argTypes resp. argDataTypes + Class[] javaParamTypes = UDHelper.javaTypes(argDataTypes, calledOnNullInput); + // javaReturnType is just the Java representation for returnType resp. returnDataType + Class javaReturnType = returnDataType.asJavaClass(); + + // put each UDF in a separate package to prevent cross-UDF code access + String pkgName = BASE_PACKAGE + '.' + generateClassName(name, 'p'); + String clsName = generateClassName(name, 'C'); + + String executeInternalName = generateClassName(name, 'x'); + + StringBuilder javaSourceBuilder = new StringBuilder(); + int lineOffset = 1; + for (int i = 0; i < javaSourceTemplate.length; i++) + { + String s = javaSourceTemplate[i]; + + // strings at odd indexes are 'instructions' + if ((i & 1) == 1) + { + switch (s) + { + case "package_name": + s = pkgName; + break; + case "class_name": + s = clsName; + break; + case "body": + lineOffset = countNewlines(javaSourceBuilder); + s = body; + break; + case "arguments": + s = generateArguments(javaParamTypes, argNames); + break; + case "argument_list": + s = generateArgumentList(javaParamTypes, argNames); + break; + case "return_type": + s = javaSourceName(javaReturnType); + break; + case "execute_internal_name": + s = executeInternalName; + break; + } + } + + javaSourceBuilder.append(s); + } + + String targetClassName = pkgName + '.' + clsName; + + String javaSource = javaSourceBuilder.toString(); + + logger.debug("Compiling Java source UDF '{}' as class '{}' using source:\n{}", name, targetClassName, javaSource); + + try + { + EcjCompilationUnit compilationUnit = new EcjCompilationUnit(javaSource, targetClassName); + + org.eclipse.jdt.internal.compiler.Compiler compiler = new Compiler(compilationUnit, + errorHandlingPolicy, + compilerOptions, + compilationUnit, + problemFactory); + compiler.compile(new ICompilationUnit[]{ compilationUnit }); + + if (compilationUnit.problemList != null && !compilationUnit.problemList.isEmpty()) + { + boolean fullSource = false; + StringBuilder problems = new StringBuilder(); + for (IProblem problem : compilationUnit.problemList) + { + long ln = problem.getSourceLineNumber() - lineOffset; + if (ln < 1L) + { + if (problem.isError()) + { + // if generated source around UDF source provided by the user is buggy, + // this code is appended. + problems.append("GENERATED SOURCE ERROR: line ") + .append(problem.getSourceLineNumber()) + .append(" (in generated source): ") + .append(problem.getMessage()) + .append('\n'); + fullSource = true; + } + } + else + { + problems.append("Line ") + .append(Long.toString(ln)) + .append(": ") + .append(problem.getMessage()) + .append('\n'); + } + } + + if (fullSource) + throw new InvalidRequestException("Java source compilation failed:\n" + problems + "\n generated source:\n" + javaSource); + else + throw new InvalidRequestException("Java source compilation failed:\n" + problems); + } + + // Verify the UDF bytecode against use of probably dangerous code + Set errors = udfByteCodeVerifier.verify(targetClassLoader.classData(targetClassName)); + String validDeclare = "not allowed method declared: " + executeInternalName + '('; + String validCall = "call to " + targetClassName.replace('.', '/') + '.' + executeInternalName + "()"; + for (Iterator i = errors.iterator(); i.hasNext();) + { + String error = i.next(); + // we generate a random name of the private, internal execute method, which is detected by the byte-code verifier + if (error.startsWith(validDeclare) || error.equals(validCall)) + { + i.remove(); + } + } + if (!errors.isEmpty()) + throw new InvalidRequestException("Java UDF validation failed: " + errors); + + // Load the class and create a new instance of it + Thread thread = Thread.currentThread(); + ClassLoader orig = thread.getContextClassLoader(); + try + { + thread.setContextClassLoader(UDFunction.udfClassLoader); + // Execute UDF intiialization from UDF class loader + + Class cls = Class.forName(targetClassName, false, targetClassLoader); + + if (cls.getDeclaredMethods().length != 2 || cls.getDeclaredConstructors().length != 1) + throw new InvalidRequestException("Check your source to not define additional Java methods or constructors"); + MethodType methodType = MethodType.methodType(void.class) + .appendParameterTypes(DataType.class, DataType[].class); + MethodHandle ctor = MethodHandles.lookup().findConstructor(cls, methodType); + this.javaUDF = (JavaUDF) ctor.invokeWithArguments(returnDataType, argDataTypes); + } + finally + { + thread.setContextClassLoader(orig); + } + } + catch (InvocationTargetException e) + { + // in case of an ITE, use the cause + throw new InvalidRequestException(String.format("Could not compile function '%s' from Java source: %s", name, e.getCause())); + } + catch (VirtualMachineError e) + { + throw e; + } + catch (Throwable e) + { + throw new InvalidRequestException(String.format("Could not compile function '%s' from Java source: %s", name, e)); + } + } + + protected ExecutorService executor() + { + return executor; + } + + protected ByteBuffer executeUserDefined(int protocolVersion, List params) + { + return javaUDF.executeImpl(protocolVersion, params); + } + + + private static int countNewlines(StringBuilder javaSource) + { + int ln = 0; + for (int i = 0; i < javaSource.length(); i++) + if (javaSource.charAt(i) == '\n') + ln++; + return ln; + } + + private static String generateClassName(FunctionName name, char prefix) + { + String qualifiedName = name.toString(); + + StringBuilder sb = new StringBuilder(qualifiedName.length() + 10); + sb.append(prefix); + for (int i = 0; i < qualifiedName.length(); i++) + { + char c = qualifiedName.charAt(i); + if (Character.isJavaIdentifierPart(c)) + sb.append(c); + else + sb.append(Integer.toHexString(((short)c)&0xffff)); + } + sb.append('_') + .append(ThreadLocalRandom.current().nextInt() & 0xffffff) + .append('_') + .append(classSequence.incrementAndGet()); + return sb.toString(); + } + + private static String javaSourceName(Class type) + { + String n = type.getName(); + return n.startsWith("java.lang.") ? type.getSimpleName() : n; + } + + private static String generateArgumentList(Class[] paramTypes, List argNames) + { + // initial builder size can just be a guess (prevent temp object allocations) + StringBuilder code = new StringBuilder(32 * paramTypes.length); + for (int i = 0; i < paramTypes.length; i++) + { + if (i > 0) + code.append(", "); + code.append(javaSourceName(paramTypes[i])) + .append(' ') + .append(argNames.get(i)); + } + return code.toString(); + } + + private static String generateArguments(Class[] paramTypes, List argNames) + { + StringBuilder code = new StringBuilder(64 * paramTypes.length); + for (int i = 0; i < paramTypes.length; i++) + { + if (i > 0) + code.append(",\n"); + + if (logger.isDebugEnabled()) + code.append(" /* parameter '").append(argNames.get(i)).append("' */\n"); + + code + // cast to Java type + .append(" (").append(javaSourceName(paramTypes[i])).append(") ") + // generate object representation of input parameter (call UDFunction.compose) + .append(composeMethod(paramTypes[i])).append("(protocolVersion, ").append(i).append(", params.get(").append(i).append("))"); + } + return code.toString(); + } + + private static String composeMethod(Class type) + { + return (type.isPrimitive()) ? ("super.compose_" + type.getName()) : "super.compose"; + } + + // Java source UDFs are a very simple compilation task, which allows us to let one class implement + // all interfaces required by ECJ. + static final class EcjCompilationUnit implements ICompilationUnit, ICompilerRequestor, INameEnvironment + { + List problemList; + private final String className; + private final char[] sourceCode; + + EcjCompilationUnit(String sourceCode, String className) + { + this.className = className; + this.sourceCode = sourceCode.toCharArray(); + } + + // ICompilationUnit + + @Override + public char[] getFileName() + { + return sourceCode; + } + + @Override + public char[] getContents() + { + return sourceCode; + } + + @Override + public char[] getMainTypeName() + { + int dot = className.lastIndexOf('.'); + return ((dot > 0) ? className.substring(dot + 1) : className).toCharArray(); + } + + @Override + public char[][] getPackageName() + { + StringTokenizer izer = new StringTokenizer(className, "."); + char[][] result = new char[izer.countTokens() - 1][]; + for (int i = 0; i < result.length; i++) + result[i] = izer.nextToken().toCharArray(); + return result; + } + + @Override + public boolean ignoreOptionalProblems() + { + return false; + } + + // ICompilerRequestor + + @Override + public void acceptResult(CompilationResult result) + { + if (result.hasErrors()) + { + IProblem[] problems = result.getProblems(); + if (problemList == null) + problemList = new ArrayList<>(problems.length); + Collections.addAll(problemList, problems); + } + else + { + ClassFile[] classFiles = result.getClassFiles(); + for (ClassFile classFile : classFiles) + targetClassLoader.addClass(className, classFile.getBytes()); + } + } + + // INameEnvironment + + @Override + public NameEnvironmentAnswer findType(char[][] compoundTypeName) + { + StringBuilder result = new StringBuilder(); + for (int i = 0; i < compoundTypeName.length; i++) + { + if (i > 0) + result.append('.'); + result.append(compoundTypeName[i]); + } + return findType(result.toString()); + } + + @Override + public NameEnvironmentAnswer findType(char[] typeName, char[][] packageName) + { + StringBuilder result = new StringBuilder(); + int i = 0; + for (; i < packageName.length; i++) + { + if (i > 0) + result.append('.'); + result.append(packageName[i]); + } + if (i > 0) + result.append('.'); + result.append(typeName); + return findType(result.toString()); + } + + private NameEnvironmentAnswer findType(String className) + { + if (className.equals(this.className)) + { + return new NameEnvironmentAnswer(this, null); + } + + String resourceName = className.replace('.', '/') + ".class"; + + try (InputStream is = UDFunction.udfClassLoader.getResourceAsStream(resourceName)) + { + if (is != null) + { + byte[] classBytes = ByteStreams.toByteArray(is); + char[] fileName = className.toCharArray(); + ClassFileReader classFileReader = new ClassFileReader(classBytes, fileName, true); + return new NameEnvironmentAnswer(classFileReader, null); + } + } + catch (IOException | ClassFormatException exc) + { + throw new RuntimeException(exc); + } + return null; + } + + private boolean isPackage(String result) + { + if (result.equals(this.className)) + return false; + String resourceName = result.replace('.', '/') + ".class"; + try (InputStream is = UDFunction.udfClassLoader.getResourceAsStream(resourceName)) + { + return is == null; + } + catch (IOException e) + { + // we are here, since close on is failed. That means it was not null + return false; + } + } + + @Override + public boolean isPackage(char[][] parentPackageName, char[] packageName) + { + StringBuilder result = new StringBuilder(); + int i = 0; + if (parentPackageName != null) + for (; i < parentPackageName.length; i++) + { + if (i > 0) + result.append('.'); + result.append(parentPackageName[i]); + } + + if (Character.isUpperCase(packageName[0]) && !isPackage(result.toString())) + return false; + if (i > 0) + result.append('.'); + result.append(packageName); + + return isPackage(result.toString()); + } + + @Override + public void cleanup() + { + } + } + + static final class EcjTargetClassLoader extends SecureClassLoader + { + EcjTargetClassLoader() + { + super(UDFunction.udfClassLoader); + } + + // This map is usually empty. + // It only contains data *during* UDF compilation but not during runtime. + // + // addClass() is invoked by ECJ after successful compilation of the generated Java source. + // loadClass(targetClassName) is invoked by buildUDF() after ECJ returned from successful compilation. + // + private final Map classes = new ConcurrentHashMap<>(); + + void addClass(String className, byte[] classData) + { + classes.put(className, classData); + } + + byte[] classData(String className) + { + return classes.get(className); + } + + protected Class findClass(String name) throws ClassNotFoundException + { + // remove the class binary - it's only used once - so it's wasting heap + byte[] classData = classes.remove(name); + + if (classData != null) + return defineClass(name, classData, 0, classData.length, protectionDomain); + + return getParent().loadClass(name); + } + + protected PermissionCollection getPermissions(CodeSource codesource) + { + return ThreadAwareSecurityManager.noPermissions; + } + }} http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java index d79960f,0000000..8b448fe mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java +++ b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java @@@ -1,265 -1,0 +1,262 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.functions; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandler; +import java.nio.ByteBuffer; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.security.CodeSource; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.security.ProtectionDomain; +import java.security.cert.Certificate; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import javax.script.Bindings; +import javax.script.Compilable; +import javax.script.CompiledScript; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineFactory; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; +import javax.script.SimpleScriptContext; + - import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor; ++import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; - import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.exceptions.InvalidRequestException; + +final class ScriptBasedUDFunction extends UDFunction +{ + static final Map scriptEngines = new HashMap<>(); + + private static final ProtectionDomain protectionDomain; + private static final AccessControlContext accessControlContext; + + // + // For scripted UDFs we have to rely on the security mechanisms of the scripting engine and + // SecurityManager - especially SecurityManager.checkPackageAccess(). Unlike Java-UDFs, strict checking + // of class access via the UDF class loader is not possible, since e.g. Nashorn builds its own class loader + // (jdk.nashorn.internal.runtime.ScriptLoader / jdk.nashorn.internal.runtime.NashornLoader) configured with + // a system class loader. + // + private static final String[] allowedPackagesArray = + { + // following required by jdk.nashorn.internal.objects.Global.initJavaAccess() + "", + "com", + "edu", + "java", + "javax", + "javafx", + "org", + // following required by Nashorn runtime + "java.lang", + "java.lang.invoke", + "java.lang.reflect", + "java.nio.charset", + "java.util", + "java.util.concurrent", + "javax.script", + "sun.reflect", + "jdk.internal.org.objectweb.asm.commons", + "jdk.nashorn.internal.runtime", + "jdk.nashorn.internal.runtime.linker", + // following required by Java Driver + "java.math", + "java.text", + "com.google.common.base", + "com.google.common.reflect", + // following required by UDF + "com.datastax.driver.core", + "com.datastax.driver.core.utils" + }; + - private static final JMXEnabledScheduledThreadPoolExecutor executor = - new JMXEnabledScheduledThreadPoolExecutor( - DatabaseDescriptor.getMaxHintsThread(), - new NamedThreadFactory("UserDefinedScriptFunctions", - Thread.MIN_PRIORITY, - udfClassLoader, - new SecurityThreadGroup("UserDefinedScriptFunctions", Collections.unmodifiableSet(new HashSet<>(Arrays.asList(allowedPackagesArray))))), - "userscripts"); ++ private static final JMXEnabledThreadPoolExecutor executor = ++ new JMXEnabledThreadPoolExecutor(new NamedThreadFactory("UserDefinedScriptFunctions", ++ Thread.MIN_PRIORITY, ++ udfClassLoader, ++ new SecurityThreadGroup("UserDefinedScriptFunctions", Collections.unmodifiableSet(new HashSet<>(Arrays.asList(allowedPackagesArray))))), ++ "userscripts"); + + static + { + ScriptEngineManager scriptEngineManager = new ScriptEngineManager(); + for (ScriptEngineFactory scriptEngineFactory : scriptEngineManager.getEngineFactories()) + { + ScriptEngine scriptEngine = scriptEngineFactory.getScriptEngine(); + boolean compilable = scriptEngine instanceof Compilable; + if (compilable) + { + logger.info("Found scripting engine {} {} - {} {} - language names: {}", + scriptEngineFactory.getEngineName(), scriptEngineFactory.getEngineVersion(), + scriptEngineFactory.getLanguageName(), scriptEngineFactory.getLanguageVersion(), + scriptEngineFactory.getNames()); + for (String name : scriptEngineFactory.getNames()) + scriptEngines.put(name, (Compilable) scriptEngine); + } + } + + try + { + protectionDomain = new ProtectionDomain(new CodeSource(new URL("udf", "localhost", 0, "/script", new URLStreamHandler() + { + protected URLConnection openConnection(URL u) + { + return null; + } + }), (Certificate[]) null), ThreadAwareSecurityManager.noPermissions); + } + catch (MalformedURLException e) + { + throw new RuntimeException(e); + } + accessControlContext = new AccessControlContext(new ProtectionDomain[]{ protectionDomain }); + } + + private final CompiledScript script; + + ScriptBasedUDFunction(FunctionName name, + List argNames, + List> argTypes, + AbstractType returnType, + boolean calledOnNullInput, + String language, + String body) + { + super(name, argNames, argTypes, returnType, calledOnNullInput, language, body); + + Compilable scriptEngine = scriptEngines.get(language); + if (scriptEngine == null) + throw new InvalidRequestException(String.format("Invalid language '%s' for function '%s'", language, name)); + + // execute compilation with no-permissions to prevent evil code e.g. via "static code blocks" / "class initialization" + try + { + this.script = AccessController.doPrivileged((PrivilegedExceptionAction) () -> scriptEngine.compile(body), + accessControlContext); + } + catch (PrivilegedActionException x) + { + Throwable e = x.getCause(); + logger.info("Failed to compile function '{}' for language {}: ", name, language, e); + throw new InvalidRequestException( + String.format("Failed to compile function '%s' for language %s: %s", name, language, e)); + } + } + + protected ExecutorService executor() + { + return executor; + } + + public ByteBuffer executeUserDefined(int protocolVersion, List parameters) + { + Object[] params = new Object[argTypes.size()]; + for (int i = 0; i < params.length; i++) + params[i] = compose(protocolVersion, i, parameters.get(i)); + + ScriptContext scriptContext = new SimpleScriptContext(); + scriptContext.setAttribute("javax.script.filename", this.name.toString(), ScriptContext.ENGINE_SCOPE); + Bindings bindings = scriptContext.getBindings(ScriptContext.ENGINE_SCOPE); + for (int i = 0; i < params.length; i++) + bindings.put(argNames.get(i).toString(), params[i]); + + Object result; + try + { + // How to prevent Class.forName() _without_ "help" from the script engine ? + // NOTE: Nashorn enforces a special permission to allow class-loading, which is not granted - so it's fine. + + result = script.eval(scriptContext); + } + catch (ScriptException e) + { + throw new RuntimeException(e); + } + if (result == null) + return null; + + Class javaReturnType = returnDataType.asJavaClass(); + Class resultType = result.getClass(); + if (!javaReturnType.isAssignableFrom(resultType)) + { + if (result instanceof Number) + { + Number rNumber = (Number) result; + if (javaReturnType == Integer.class) + result = rNumber.intValue(); + else if (javaReturnType == Long.class) + result = rNumber.longValue(); + else if (javaReturnType == Short.class) + result = rNumber.shortValue(); + else if (javaReturnType == Byte.class) + result = rNumber.byteValue(); + else if (javaReturnType == Float.class) + result = rNumber.floatValue(); + else if (javaReturnType == Double.class) + result = rNumber.doubleValue(); + else if (javaReturnType == BigInteger.class) + { + if (javaReturnType == Integer.class) + result = rNumber.intValue(); + else if (javaReturnType == Short.class) + result = rNumber.shortValue(); + else if (javaReturnType == Byte.class) + result = rNumber.byteValue(); + else if (javaReturnType == Long.class) + result = rNumber.longValue(); + else if (javaReturnType == Float.class) + result = rNumber.floatValue(); + else if (javaReturnType == Double.class) + result = rNumber.doubleValue(); + else if (javaReturnType == BigInteger.class) + { + if (rNumber instanceof BigDecimal) + result = ((BigDecimal) rNumber).toBigInteger(); + else if (rNumber instanceof Double || rNumber instanceof Float) + result = new BigDecimal(rNumber.toString()).toBigInteger(); + else + result = BigInteger.valueOf(rNumber.longValue()); + } + else if (javaReturnType == BigDecimal.class) + // String c'tor of BigDecimal is more accurate than valueOf(double) + result = new BigDecimal(rNumber.toString()); + } + else if (javaReturnType == BigDecimal.class) + // String c'tor of BigDecimal is more accurate than valueOf(double) + result = new BigDecimal(rNumber.toString()); + } + } + + return decompose(protocolVersion, result); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java index 17832d7,dae85b7..4656d41 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@@ -36,13 -37,15 +36,15 @@@ import com.google.common.util.concurren import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor; + import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; + import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.composites.Composites; ++ +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UUIDType; @@@ -57,9 -62,10 +59,11 @@@ import org.apache.cassandra.io.util.Dat import org.apache.cassandra.metrics.HintedHandoffMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; - import org.apache.cassandra.service.*; + import org.apache.cassandra.service.StorageProxy; + import org.apache.cassandra.service.StorageService; + import org.apache.cassandra.service.WriteResponseHandler; import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.OpOrder; import org.cliffc.high_scale_lib.NonBlockingHashSet; /**