tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [38/50] incubator-tinkerpop git commit: TINKERPOP3-988 Change GraphComputer executors
Date Thu, 03 Dec 2015 16:43:34 GMT
TINKERPOP3-988 Change GraphComputer executors

Changed (Giraph|Spark)GraphComputer's submit methods to use a custom
executor instead of ForkJoinPool.commonPool.

These submit methods do very little in the calling thread; most of the
work is done in an asynchronously-executed completable future.

If the async task executes on ForkJoinPool.commonPool (the default if
no executor is supplied), then two problems can arise:

1. Loss of context classloader

The context classloader of the thread that called submit is not
necessarily the same as the context classloader common forkjoin pool
threads. This matters because multiple bits of code reachable from
submit's async task rely on the context classloader. SparkMemory is
one; Hadoop's UserGroupInformation is another, depending on the
credentials configuration (UGI is reached indirectly via
FileSystem.get). This basically means that the caller has to use
whatever context classloader is currently in use by the fork join
common pool, or else bad things can happen, such as
nonsensical-looking ClassCastExceptions.

2. Inability to set context classloader

When System.getSecurityManager() != null, the common forkjoin pool
switches from its default worker thread factory implementation to a
more restrictive alternative called
InnocuousForkJoinWorkerThreadFactory. Threads created by this factory
can't call setContextClassLoader. Attempting to do so throws a
SecurityException. However, UserGroupInformation.newLoginContext must
be able to call setContextClassLoader. It saves the CCL to a variable,
does some work, then restores the CCL from a variable. This is
impossible if the method throws a SecurityException. So, if a security
manager is present in the VM, submit's async task can die in
FileSystem.get -> UGI before any useful work even begins.

This commit introduces shared logic around making and shutting down a
Executors.newSingleThreadExecutor.  I considered pulling it up to
AbstractHadoopGraphComputer, the common public base class of both
Spark- and GiraphGraphComputer, but I couldn't see a way to do it that
would preserve compatibility for a third-party computer implementation
that extends the same base class.  That's why I put the common logic
in a new static helper method instead.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/562d43af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/562d43af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/562d43af

Branch: refs/heads/TINKERPOP3-982
Commit: 562d43af8f0063aa5a77d730480c4efa9b8b992f
Parents: 322668b
Author: Dan LaRocque <dalaro@hopcount.org>
Authored: Tue Dec 1 08:23:42 2015 -0500
Committer: Dan LaRocque <dalaro@hopcount.org>
Committed: Tue Dec 1 08:27:23 2015 -0500

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   | 11 ++-
 .../computer/util/ComputerSubmissionHelper.java | 80 ++++++++++++++++++++
 .../process/computer/SparkGraphComputer.java    |  9 ++-
 3 files changed, 97 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/562d43af/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 67e8ea3..646b707 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -38,6 +38,7 @@ import org.apache.tinkerpop.gremlin.giraph.structure.io.GiraphVertexInputFormat;
 import org.apache.tinkerpop.gremlin.giraph.structure.io.GiraphVertexOutputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.ComputerSubmissionHelper;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper;
@@ -57,6 +58,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.NotSerializableException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.stream.Stream;
 
@@ -112,8 +114,13 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer
imple
 
     @Override
     public Future<ComputerResult> submit() {
-        final long startTime = System.currentTimeMillis();
         super.validateStatePriorToExecution();
+
+        return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor,
"GiraphSubmitter");
+    }
+
+    private Future<ComputerResult> submitWithExecutor(Executor exec) {
+        final long startTime = System.currentTimeMillis();
         final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
             try {
@@ -128,7 +135,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer
imple
 
             this.memory.setRuntime(System.currentTimeMillis() - startTime);
             return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration,
this.resultGraph, this.persist), this.memory.asImmutable());
-        });
+        }, exec);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/562d43af/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/ComputerSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/ComputerSubmissionHelper.java
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/ComputerSubmissionHelper.java
new file mode 100644
index 0000000..daad7ef
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/ComputerSubmissionHelper.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.Supplier;
+
+import com.google.common.base.Function;
+
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+
+public class ComputerSubmissionHelper {
+
+    /**
+     * Creates a {@link Executors#newSingleThreadExecutor(ThreadFactory)} configured
+     * make threads that behave like the caller, invokes a closure on it, and shuts it down.
+     * <p>
+     * This is intended to serve as an alternative to {@link ForkJoinPool#commonPool()},
+     * which is used by {@link CompletableFuture#supplyAsync(Supplier)} (among other methods).
+     * The the single threaded executor created by this method contains a thread
+     * with the same context classloader and thread group as the thread that called
+     * this method.  Threads created in this method also have predictable behavior when
+     * {@link Thread#setContextClassLoader(ClassLoader)} is invoked; threads in the
+     * common pool throw a SecurityException if the JVM has a security manager configured.
+     * <p>
+     * The name of the thread created by this method's internal executor is the concatenation
of
+     * <ul>
+     *     <li>the name of the thread that calls this method</li>
+     *     <li>"-TP-"</li>
+     *     <li>the {@code threadNameSuffix} parameter value</li>
+     * </ul>
+     *
+     * @param closure arbitrary code that has exclusive use of the supplied executor
+     * @param threadNameSuffix a string appended to the executor's thread's name
+     * @return the return value of the {@code closure} parameter
+     */
+    public static Future<ComputerResult> runWithBackgroundThread(Function<Executor,
Future<ComputerResult>> closure,
+                                                                 String threadNameSuffix)
{
+        final Thread callingThread = Thread.currentThread();
+        final ClassLoader classLoader = callingThread.getContextClassLoader();
+        final ThreadGroup threadGroup = callingThread.getThreadGroup();
+        final String threadName = callingThread.getName();
+        ExecutorService submissionExecutor = null;
+
+        try {
+            submissionExecutor = Executors.newSingleThreadExecutor(runnable -> {
+                Thread t = new Thread(threadGroup, runnable, threadName + "-TP-" + threadNameSuffix);
+                t.setContextClassLoader(classLoader);
+                return t;
+            });
+
+            return closure.apply(submissionExecutor);
+        } finally {
+            if (null != submissionExecutor)
+                submissionExecutor.shutdown(); // do not call shutdownNow, which could prematurely
terminate the closure
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/562d43af/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 6425aab..e7566d5 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -33,6 +33,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.launcher.SparkLauncher;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.ComputerSubmissionHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
@@ -54,6 +55,7 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.stream.Stream;
 
@@ -97,6 +99,11 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
     @Override
     public Future<ComputerResult> submit() {
         this.validateStatePriorToExecution();
+
+        return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor,
"SparkSubmitter");
+    }
+
+    private Future<ComputerResult> submitWithExecutor(Executor exec) {
         // apache and hadoop configurations that are used throughout the graph computer computation
         final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
         apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES,
this.persist.equals(GraphComputer.Persist.EDGES));
@@ -224,7 +231,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
                 if (sparkContext != null && !apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT,
false))
                     sparkContext.stop();
             }
-        });
+        }, exec);
     }
 
     /////////////////


Mime
View raw message