tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [05/13] incubator-tinkerpop git commit: HadoopElementIterator now supports ANY InputFormat, not just FileInputFormat. Sweet. Also, if you are using an RDD in Spark (and thus, not really doing Hadoop InputFormat stuffs), we have InputRDDFormat which wraps
Date Thu, 03 Dec 2015 19:49:24 GMT
HadoopElementIterator now supports ANY InputFormat, not just FileInputFormat. Sweet. Also,
if you are using an RDD in Spark (and thus, not really doing Hadoop InputFormat stuffs), we
have InputRDDFormat which wraps an RDD in an InputFormat so HadoopElementIterator works as
well. This solves the HadoopGraph OLTP problem for ALL InputFormats and it allows ComputerResultStep
to Attach elements for more than just FileInputFormats. Good stuff.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/85eb63db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/85eb63db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/85eb63db

Branch: refs/heads/master
Commit: 85eb63dbc4855f7f3e944b82af8c9aae03c8aed2
Parents: 5b9de83
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Tue Dec 1 09:25:14 2015 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Thu Dec 3 12:47:52 2015 -0700

----------------------------------------------------------------------
 .../structure/hdfs/HadoopEdgeIterator.java      |   4 -
 .../structure/hdfs/HadoopElementIterator.java   |  35 +++--
 .../structure/hdfs/HadoopVertexIterator.java    |   4 -
 .../HadoopGraphStructureStandardTest.java       |  35 +++++
 .../io/HadoopGraphStructureStandardTest.java    |  35 -----
 .../spark/structure/io/InputRDDFormat.java      | 136 +++++++++++++++++++
 .../spark/structure/io/InputRDDTest.java        |  21 +++
 7 files changed, 208 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/85eb63db/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java
index acdc0a4..59a4d2c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java
@@ -39,10 +39,6 @@ public final class HadoopEdgeIterator extends HadoopElementIterator<Edge>
{
 
     private Iterator<Edge> edgeIterator = Collections.emptyIterator();
 
-    public HadoopEdgeIterator(final HadoopGraph graph, final InputFormat<NullWritable,
VertexWritable> inputFormat, final Path path) throws IOException, InterruptedException
{
-        super(graph, inputFormat, path);
-    }
-
     public HadoopEdgeIterator(final HadoopGraph graph) throws IOException {
         super(graph);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/85eb63db/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
index a9f287b..f9ffea2 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
@@ -19,13 +19,14 @@
 package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
@@ -36,35 +37,31 @@ import org.apache.tinkerpop.gremlin.structure.Element;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Queue;
+import java.util.UUID;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public abstract class HadoopElementIterator<E extends Element> implements Iterator<E>
{
 
-    // TODO: Generalize so it works for more than just FileFormats.
-
     protected final HadoopGraph graph;
     protected final Queue<RecordReader<NullWritable, VertexWritable>> readers
= new LinkedList<>();
 
-    public HadoopElementIterator(final HadoopGraph graph, final InputFormat<NullWritable,
VertexWritable> inputFormat, final Path path) throws IOException, InterruptedException
{
-        this.graph = graph;
-        final Configuration configuration = ConfUtil.makeHadoopConfiguration(this.graph.configuration());
-        for (final Path path2 : HDFSTools.getAllFilePaths(FileSystem.get(configuration),
path, HiddenFileFilter.instance())) {
-            this.readers.add(inputFormat.createRecordReader(new FileSplit(path2, 0, Long.MAX_VALUE,
new String[]{}), new TaskAttemptContextImpl(configuration, new TaskAttemptID())));
-        }
-    }
-
     public HadoopElementIterator(final HadoopGraph graph) throws IOException {
         try {
             this.graph = graph;
-            if (this.graph.configuration().containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
{
-                final Configuration configuration = ConfUtil.makeHadoopConfiguration(this.graph.configuration());
-                final InputFormat<NullWritable, VertexWritable> inputFormat = this.graph.configuration().getGraphInputFormat().getConstructor().newInstance();
-                for (final Path path : HDFSTools.getAllFilePaths(FileSystem.get(configuration),
new Path(graph.configuration().getInputLocation()), HiddenFileFilter.instance())) {
-                    this.readers.add(inputFormat.createRecordReader(new FileSplit(path, 0,
Long.MAX_VALUE, new String[]{}), new TaskAttemptContextImpl(configuration, new TaskAttemptID())));
-                }
+            final Configuration configuration = ConfUtil.makeHadoopConfiguration(this.graph.configuration());
+            final InputFormat<NullWritable, VertexWritable> inputFormat = this.graph.configuration().getGraphInputFormat().getConstructor().newInstance();
+            if (inputFormat instanceof FileInputFormat) {
+                if (!this.graph.configuration().containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
+                    return; // there is not input location and thus, no data (empty graph)
+                configuration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, this.graph.configuration().getInputLocation());
+            }
+            final List<InputSplit> splits = inputFormat.getSplits(new JobContextImpl(configuration,
new JobID(UUID.randomUUID().toString(), 1)));
+            for (final InputSplit split : splits) {
+                this.readers.add(inputFormat.createRecordReader(split, new TaskAttemptContextImpl(configuration,
new TaskAttemptID())));
             }
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/85eb63db/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java
index 8977692..8f13c59 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java
@@ -36,10 +36,6 @@ public final class HadoopVertexIterator extends HadoopElementIterator<Vertex>
{
 
     private HadoopVertex nextVertex = null;
 
-    public HadoopVertexIterator(final HadoopGraph graph, final InputFormat<NullWritable,
VertexWritable> inputFormat, final Path path) throws IOException, InterruptedException
{
-        super(graph, inputFormat, path);
-    }
-
     public HadoopVertexIterator(final HadoopGraph graph) throws IOException {
         super(graph);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/85eb63db/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphStructureStandardTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphStructureStandardTest.java
b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphStructureStandardTest.java
new file mode 100644
index 0000000..459c900
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphStructureStandardTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.structure.StructureStandardSuite;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(StructureStandardSuite.class)
+@GraphProviderClass(provider = HadoopGraphProvider.class, graph = HadoopGraph.class)
+public class HadoopGraphStructureStandardTest {
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/85eb63db/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopGraphStructureStandardTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopGraphStructureStandardTest.java
b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopGraphStructureStandardTest.java
deleted file mode 100644
index ccb2d64..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopGraphStructureStandardTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.io;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.structure.StructureStandardSuite;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(StructureStandardSuite.class)
-@GraphProviderClass(provider = HadoopGraphProvider.class, graph = HadoopGraph.class)
-public class HadoopGraphStructureStandardTest {
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/85eb63db/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
new file mode 100644
index 0000000..3952c66
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InputRDDFormat extends InputFormat<NullWritable, VertexWritable>
{
+
+    public InputRDDFormat() {
+
+    }
+
+    @Override
+    public List<InputSplit> getSplits(final JobContext jobContext) throws IOException,
InterruptedException {
+        return Collections.singletonList(new InputSplit() {
+            @Override
+            public long getLength() throws IOException, InterruptedException {
+                return 0;
+            }
+
+            @Override
+            public String[] getLocations() throws IOException, InterruptedException {
+                return new String[0];
+            }
+        });
+    }
+
+    @Override
+    public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit
inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
+        try {
+            final org.apache.hadoop.conf.Configuration hadoopConfiguration = taskAttemptContext.getConfiguration();
+            final SparkConf sparkConfiguration = new SparkConf();
+            sparkConfiguration.setAppName(UUID.randomUUID().toString());
+            hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(),
entry.getValue()));
+            InputRDD inputRDD = (InputRDD) Class.forName(sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD)).newInstance();
+            JavaSparkContext javaSparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+            final Iterator<Tuple2<Object, VertexWritable>> iterator = inputRDD.readGraphRDD(ConfUtil.makeApacheConfiguration(taskAttemptContext.getConfiguration()),
javaSparkContext).toLocalIterator();
+            return new RecordReader<NullWritable, VertexWritable>() {
+                @Override
+                public void initialize(final InputSplit inputSplit, final TaskAttemptContext
taskAttemptContext) throws IOException, InterruptedException {
+
+                }
+
+                @Override
+                public boolean nextKeyValue() throws IOException, InterruptedException {
+                    return iterator.hasNext();
+                }
+
+                @Override
+                public NullWritable getCurrentKey() throws IOException, InterruptedException
{
+                    return NullWritable.get();
+                }
+
+                @Override
+                public VertexWritable getCurrentValue() throws IOException, InterruptedException
{
+                    return iterator.next()._2();
+                }
+
+                @Override
+                public float getProgress() throws IOException, InterruptedException {
+                    return 1.0f; // TODO: make this dynamic (how? its an iterator.)
+                }
+
+                @Override
+                public void close() throws IOException {
+                    if (!hadoopConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT,
false))
+                        javaSparkContext.close();
+                }
+            };
+        } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException
e) {
+            throw new IOException(e.getMessage(), e);
+        }
+
+    }
+
+    /*private static class PartitionInputSplit extends InputSplit {
+
+        private final Partition partition;
+
+        public PartitionInputSplit(final Partition partition) {
+            this.partition = partition;
+        }
+
+        @Override
+        public long getLength() throws IOException, InterruptedException {
+            return 0;
+        }
+
+        @Override
+        public String[] getLocations() throws IOException, InterruptedException {
+            return new String[0];
+        }
+
+        public Partition getPartition() {
+            return this.partition;
+        }
+    }*/
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/85eb63db/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
index 5ba3b12..98a2b9f 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
@@ -24,6 +24,7 @@ import org.apache.spark.serializer.KryoSerializer;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -52,4 +53,24 @@ public class InputRDDTest {
         assertEquals(123l, graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().values("age").sum().next());
         assertEquals(Long.valueOf(4l), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next());
     }
+
+    @Test
+    public void shouldSupportHadoopGraphOLTP() {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName());
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        ////////
+        Graph graph = GraphFactory.open(configuration);
+        GraphTraversalSource g = graph.traversal(); // OLTP;
+        assertEquals("person", g.V().has("age", 29).next().label());
+        assertEquals(Long.valueOf(4), g.V().count().next());
+        assertEquals(Long.valueOf(0), g.E().count().next());
+        assertEquals(Long.valueOf(2), g.V().has("age", P.gt(30)).count().next());
+    }
 }


Mime
View raw message