gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject [01/25] gora git commit: * JavaPairRDD support for GoraInputFormat.
Date Thu, 03 Sep 2015 07:28:37 GMT
Repository: gora
Updated Branches:
  refs/heads/master 130257370 -> ea44388f9


* JavaPairRDD support for GoraInputFormat.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/9c2d225d
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/9c2d225d
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/9c2d225d

Branch: refs/heads/master
Commit: 9c2d225d04cfa746244373fa661a1aa6f03250bb
Parents: bb09d89
Author: Furkan KAMACI <furkankamaci@gmail.com>
Authored: Sun Jun 28 01:57:51 2015 +0300
Committer: Furkan KAMACI <furkankamaci@gmail.com>
Committed: Sun Jun 28 01:57:51 2015 +0300

----------------------------------------------------------------------
 gora-core/pom.xml                               |  7 ++
 .../java/org/apache/gora/spark/GoraSpark.java   | 55 ++++++++++++++++
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 69 ++++++++++++++++++++
 pom.xml                                         |  4 ++
 4 files changed, 135 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/gora-core/pom.xml
----------------------------------------------------------------------
diff --git a/gora-core/pom.xml b/gora-core/pom.xml
index eab5330..5f147fb 100644
--- a/gora-core/pom.xml
+++ b/gora-core/pom.xml
@@ -141,6 +141,13 @@
       <artifactId>guava</artifactId>
     </dependency>
 
+    <!-- Spark dependency -->
+    <dependency> <!-- Spark dependency -->
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_2.10</artifactId>
+      <version>1.3.1</version>
+    </dependency>
+
     <!-- Logging Dependencies -->
     <dependency>
       <groupId>log4j</groupId>

http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
new file mode 100644
index 0000000..690e32c
--- /dev/null
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.spark;
+
+import java.io.IOException;
+
+import org.apache.gora.mapreduce.GoraInputFormat;
+import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Base class for Spark integration
+ */
+public class GoraSpark<K, V extends Persistent> {
+  Class<K> clazzK;
+  Class<V> clazzV;
+
+  public GoraSpark(Class<K> clazzK, Class<V> clazzV) {
+    this.clazzK = clazzK;
+	this.clazzV = clazzV;
+  }
+
+  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
+      Configuration conf, DataStore<K, V> dataStore) {
+	  GoraMapReduceUtils.setIOSerializations(conf, true);
+      try {
+	    IOUtils.storeToConf(dataStore.newQuery(), conf,
+		    GoraInputFormat.QUERY_KEY);
+      } catch (IOException ioex) {
+        throw new RuntimeException(ioex.getMessage());
+	  }
+       return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class,
+	     clazzK, clazzV);
+	}
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
new file mode 100644
index 0000000..214b130
--- /dev/null
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.tutorial.log;
+
+import org.apache.gora.spark.GoraSpark;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.tutorial.log.generated.Pageview;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class LogAnalyticsSpark extends Configured implements Tool {
+
+  private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
+  private static LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
+
+  public static void main(String[] args) throws Exception {
+      if (args.length < 2) {
+        System.err.println(USAGE);
+        System.exit(1);
+    }
+      // run as any other MR job
+      int ret = ToolRunner.run(logAnalyticsSpark, args);
+      System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    GoraSpark<Long, Pageview> goraSpark = new GoraSpark<Long, Pageview>(
+      Long.class, Pageview.class);
+
+    SparkConf conf = new SparkConf().setAppName(
+        "Gora Integration Application").setMaster("local");
+    JavaSparkContext sc = new JavaSparkContext(conf);
+
+    String dataStoreClass = args[0];
+    DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
+        dataStoreClass, Long.class, Pageview.class,
+    logAnalyticsSpark.getConf());
+
+    JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview> goraRDD = goraSpark
+        .initialize(sc, logAnalyticsSpark.getConf(), dataStore);
+    // JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview>
+    // cachedGoraRdd = goraRDD.cache();
+
+    long count = goraRDD.count();
+    System.out.println("Total Count: " + count);
+    return 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 01a7fee..a6261ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -893,6 +893,10 @@
             <groupId>ant</groupId>
             <artifactId>ant</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>org.mortbay.jetty</groupId>
+            <artifactId>servlet-api</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
 


Mime
View raw message