tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dkupp...@apache.org
Subject [48/50] [abbrv] incubator-tinkerpop git commit: Allow DFS paths in `HADOOP_GREMLIN_LIBS`.
Date Thu, 09 Jun 2016 16:47:29 GMT
Allow DFS paths in `HADOOP_GREMLIN_LIBS`.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/94603501
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/94603501
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/94603501

Branch: refs/heads/TINKERPOP-1331
Commit: 946035019dcaf07edb64b7f920de82486fef1364
Parents: ff52eb6
Author: Daniel Kuppitz <daniel_kuppitz@hotmail.com>
Authored: Thu Jun 9 13:46:34 2016 +0200
Committer: Daniel Kuppitz <daniel_kuppitz@hotmail.com>
Committed: Thu Jun 9 18:45:12 2016 +0200

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   | 53 ++++++++------------
 .../computer/AbstractHadoopGraphComputer.java   | 46 +++++++++++++++++
 .../process/computer/SparkGraphComputer.java    | 28 ++---------
 3 files changed, 72 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/94603501/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 012b9fc..b06b40a 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -61,13 +61,14 @@ import org.apache.tinkerpop.gremlin.util.Gremlin;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.NotSerializableException;
+import java.net.URI;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
-import java.util.stream.Stream;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -133,8 +134,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer
imple
         final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
             try {
-                final FileSystem fs = FileSystem.get(this.giraphConfiguration);
-                this.loadJars(fs);
+                this.loadJars(giraphConfiguration);
                 ToolRunner.run(this, new String[]{});
             } catch (final Exception e) {
                 //e.printStackTrace();
@@ -247,36 +247,25 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer
imple
         return this.giraphConfiguration;
     }
 
-    private void loadJars(final FileSystem fs) {
-        final String hadoopGremlinLibsRemote = "hadoop-gremlin-" + Gremlin.version() + "-libs";
-        if (this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE,
true)) {
-            final String hadoopGremlinLibsLocal = null == System.getProperty(Constants.HADOOP_GREMLIN_LIBS)
? System.getenv(Constants.HADOOP_GREMLIN_LIBS) : System.getProperty(Constants.HADOOP_GREMLIN_LIBS);
-            if (null == hadoopGremlinLibsLocal)
-                this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding
regardless");
-            else {
-                final String[] paths = hadoopGremlinLibsLocal.split(":");
-                for (final String path : paths) {
-                    final File file = AbstractHadoopGraphComputer.copyDirectoryIfNonExistent(fs,
path);
-                    if (file.exists()) {
-                        Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f
-> {
-                            try {
-                                final Path jarFile = new Path(fs.getHomeDirectory() + "/"
+ hadoopGremlinLibsRemote + "/" + f.getName());
-                                if (!fs.exists(jarFile))
-                                    fs.copyFromLocalFile(new Path(f.getPath()), jarFile);
-                                try {
-                                    DistributedCache.addArchiveToClassPath(jarFile, this.giraphConfiguration,
fs);
-                                } catch (final Exception e) {
-                                    throw new RuntimeException(e.getMessage(), e);
-                                }
-                            } catch (final Exception e) {
-                                throw new IllegalStateException(e.getMessage(), e);
-                            }
-                        });
-                    } else {
-                        this.logger.warn(path + " does not reference a valid directory --
proceeding regardless");
-                    }
-                }
+    @Override
+    protected void loadJar(final org.apache.hadoop.conf.Configuration hadoopConfiguration,
final File file, final Object... params)
+            throws IOException {
+        final FileSystem defaultFileSystem = FileSystem.get(hadoopConfiguration);
+        try {
+            final Path jarFile = new Path(defaultFileSystem.getHomeDirectory() + "/hadoop-gremlin-"
+ Gremlin.version() + "-libs/" + file.getName());
+            if (!defaultFileSystem.exists(jarFile)) {
+                final Path sourcePath = new Path(file.getPath());
+                final URI sourceUri = sourcePath.toUri();
+                final FileSystem fs = FileSystem.get(sourceUri, hadoopConfiguration);
+                fs.copyFromLocalFile(sourcePath, jarFile);
             }
+            try {
+                DistributedCache.addArchiveToClassPath(jarFile, this.giraphConfiguration,
defaultFileSystem);
+            } catch (final Exception e) {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/94603501/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
index a05a1be..f5f332d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -41,15 +42,22 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public abstract class AbstractHadoopGraphComputer implements GraphComputer {
 
+    private final static Pattern PATH_PATTERN = Pattern.compile("([^:]|://)+");
+
     protected final Logger logger;
     protected final HadoopGraph hadoopGraph;
     protected boolean executed = false;
@@ -139,6 +147,44 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer
{
             throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.workers,
this.features().getMaxWorkers());
     }
 
+    protected void loadJars(final Configuration hadoopConfiguration, final Object... params)
{
+        if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE,
true)) {
+            final String hadoopGremlinLibs = null == System.getProperty(Constants.HADOOP_GREMLIN_LIBS)
? System.getenv(Constants.HADOOP_GREMLIN_LIBS) : System.getProperty(Constants.HADOOP_GREMLIN_LIBS);
+            if (null == hadoopGremlinLibs)
+                this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding
regardless");
+            else {
+                try {
+                    final Matcher matcher = PATH_PATTERN.matcher(hadoopGremlinLibs);
+                    while (matcher.find()) {
+                        final String path = matcher.group();
+                        FileSystem fs;
+                        try {
+                            final URI uri = new URI(path);
+                            fs = FileSystem.get(uri, hadoopConfiguration);
+                        } catch (URISyntaxException e) {
+                            fs = FileSystem.get(hadoopConfiguration);
+                        }
+                        final File file = AbstractHadoopGraphComputer.copyDirectoryIfNonExistent(fs,
path);
+                        if (file.exists()) {
+                            for (final File f : file.listFiles()) {
+                                if (f.getName().endsWith(Constants.DOT_JAR)) {
+                                    loadJar(hadoopConfiguration, f, params);
+                                }
+                            }
+                        }
+                        else
+                            this.logger.warn(path + " does not reference a valid directory
-- proceeding regardless");
+                    }
+                } catch (IOException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    protected abstract void loadJar(final Configuration hadoopConfiguration, final File file,
final Object... params)
+            throws IOException;
+
     @Override
     public Features features() {
         return new Features();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/94603501/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 9e05e53..6e09835 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -78,7 +78,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
-import java.util.stream.Stream;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -209,7 +208,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
             // execute the vertex program and map reducers and if there is a failure, auto-close
the spark context
             try {
                 final JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
-                this.loadJars(sparkContext, hadoopConfiguration); // add the project jars
to the cluster
+                this.loadJars(hadoopConfiguration, sparkContext); // add the project jars
to the cluster
                 Spark.create(sparkContext.sc()); // this is the context RDD holder that prevents
GC
                 updateLocalConfiguration(sparkContext, sparkConfiguration);
                 // create a message-passing friendly rdd from the input rdd
@@ -384,27 +383,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
 
     /////////////////
 
-    private void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration)
{
-        if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE,
true)) {
-            final String hadoopGremlinLocalLibs = null == System.getProperty(Constants.HADOOP_GREMLIN_LIBS)
? System.getenv(Constants.HADOOP_GREMLIN_LIBS) : System.getProperty(Constants.HADOOP_GREMLIN_LIBS);
-            if (null == hadoopGremlinLocalLibs)
-                this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding
regardless");
-            else {
-                try {
-                    final String[] paths = hadoopGremlinLocalLibs.split(":");
-                    final FileSystem fs = FileSystem.get(hadoopConfiguration);
-                    for (final String path : paths) {
-                        final File file = AbstractHadoopGraphComputer.copyDirectoryIfNonExistent(fs,
path);
-                        if (file.exists())
-                            Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f
-> sparkContext.addJar(f.getAbsolutePath()));
-                        else
-                            this.logger.warn(path + " does not reference a valid directory
-- proceeding regardless");
-                    }
-                } catch (IOException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                }
-            }
-        }
+    @Override
+    protected void loadJar(final Configuration hadoopConfiguration, final File file, final
Object... params) {
+        final JavaSparkContext sparkContext = (JavaSparkContext) params[0];
+        sparkContext.addJar(file.getAbsolutePath());
     }
 
     /**


Mime
View raw message