gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject [11/25] gora git commit: * GoraSparkEngine.java architecture is improved.
Date Thu, 03 Sep 2015 07:28:47 GMT
* GoraSparkEngine.java architecture is improved.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/62be0c31
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/62be0c31
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/62be0c31

Branch: refs/heads/master
Commit: 62be0c312927e3ea4962eb966689b49dc1fcebce
Parents: 2195570
Author: Furkan KAMACI <furkankamaci@gmail.com>
Authored: Mon Aug 17 22:25:12 2015 +0300
Committer: Furkan KAMACI <furkankamaci@gmail.com>
Committed: Mon Aug 17 22:25:12 2015 +0300

----------------------------------------------------------------------
 .../org/apache/gora/spark/GoraSparkEngine.java  | 41 ++++++++++++++++++++
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 19 ++-------
 2 files changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/62be0c31/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
index ced44be..9c64542 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
@@ -21,11 +21,13 @@ import java.io.IOException;
 
 import org.apache.gora.mapreduce.GoraInputFormat;
 import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.mapreduce.GoraOutputFormat;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.util.IOUtils;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -93,4 +95,43 @@ public class GoraSparkEngine<K, V extends Persistent> {
 
     return initialize(sparkContext, hadoopConf, dataStore);
   }
+
+    /**
+     * Sets the output parameters for the job
+     * @param job the job to set the properties for
+     * @param dataStore the datastore as the output
+     * @param reuseObjects whether to reuse objects in serialization
+     */
+    public <K, V extends Persistent> Configuration setOutput(Job job,
+        DataStore<K, V> dataStore, boolean reuseObjects) {
+      return setOutput(job, dataStore.getClass(), dataStore.getKeyClass(),
+           dataStore.getPersistentClass(), reuseObjects);
+    }
+
+    /**
+     * Sets the output parameters for the job
+     *
+     * @param job             the job to set the properties for
+     * @param dataStoreClass  the datastore class
+     * @param keyClass        output key class
+     * @param persistentClass output value class
+     * @param reuseObjects    whether to reuse objects in serialization
+     */
+    @SuppressWarnings("rawtypes")
+    public <K, V extends Persistent> Configuration setOutput(Job job,
+        Class<? extends DataStore> dataStoreClass,
+        Class<K> keyClass, Class<V> persistentClass,
+        boolean reuseObjects) {
+
+      job.setOutputFormatClass(GoraOutputFormat.class);
+      job.setOutputKeyClass(keyClass);
+      job.setOutputValueClass(persistentClass);
+
+      job.getConfiguration().setClass(GoraOutputFormat.DATA_STORE_CLASS, dataStoreClass,
+              DataStore.class);
+      job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_KEY_CLASS, keyClass, Object.class);
+      job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_VALUE_CLASS,
+              persistentClass, Persistent.class);
+      return job.getConfiguration();
+    }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/62be0c31/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 327bf7f..d441de7 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -63,7 +63,6 @@ public class LogAnalyticsSpark {
   /** The number of milliseconds in a day */
   private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
 
-  // todo _fk consider using Kyro serialization
   /**
    * map function used in calculation
    */
@@ -173,9 +172,6 @@ public class LogAnalyticsSpark {
     long count = goraRDD.count();
     System.out.println("Total Log Count: " + count);
 
-    String firstOneURL = goraRDD.first()._2().getUrl().toString();
-    System.out.println("First entry's first URL:" + firstOneURL);
-
     JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD
         .values().map(mapFunc);
 
@@ -184,7 +180,7 @@ public class LogAnalyticsSpark {
 
     System.out.println("MetricDatum count:" + reducedGoraRdd.count());
 
-    //print screen output
+    //Print output for debug purpose
     /*
     Map<String, MetricDatum> metricDatumMap = reducedGoraRdd.collectAsMap();
     for (String key : metricDatumMap.keySet()) {
@@ -195,19 +191,10 @@ public class LogAnalyticsSpark {
 
     //write output to datastore
     GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
-
     Job job = Job.getInstance(hadoopConf);
-    job.setOutputFormatClass(GoraOutputFormat.class);
-    job.setOutputKeyClass(outStore.getKeyClass());
-    job.setOutputValueClass(outStore.getPersistentClass());
-
-    job.getConfiguration().setClass(GoraOutputFormat.DATA_STORE_CLASS, outStore.getClass(),
-              DataStore.class);
-    job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_KEY_CLASS, outStore.getKeyClass(),
Object.class);
-    job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_VALUE_CLASS,
-            outStore.getPersistentClass(), Persistent.class);
 
-    reducedGoraRdd.saveAsNewAPIHadoopDataset(job.getConfiguration());
+    Configuration sparkHadoopConf = goraSparkEngine.setOutput(job, outStore, true);
+    reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
     //
 
     inStore.close();


Mime
View raw message