gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kam...@apache.org
Subject svn commit: r1710396 - in /gora/site/trunk/content/current: gora-core.md tutorial.md
Date Sat, 24 Oct 2015 22:35:23 GMT
Author: kamaci
Date: Sat Oct 24 22:35:23 2015
New Revision: 1710396

URL: http://svn.apache.org/viewvc?rev=1710396&view=rev
Log:
GoraSparkEngine example added to tutorial.

Modified:
    gora/site/trunk/content/current/gora-core.md
    gora/site/trunk/content/current/tutorial.md

Modified: gora/site/trunk/content/current/gora-core.md
URL: http://svn.apache.org/viewvc/gora/site/trunk/content/current/gora-core.md?rev=1710396&r1=1710395&r2=1710396&view=diff
==============================================================================
--- gora/site/trunk/content/current/gora-core.md (original)
+++ gora/site/trunk/content/current/gora-core.md Sat Oct 24 22:35:23 2015
@@ -124,7 +124,7 @@ In the stores covered within the gora-co
 
 #GoraSparkEngine
 ##Description
-GoraSparkEngine is Spark backend of Apache Gora. Assume that input and output data stores
are:
+GoraSparkEngine is Spark backend of Gora. Assume that input and output data stores are:
 
     DataStore<K1, V1> inStore;
     DataStore<K2, V2> outStore;

Modified: gora/site/trunk/content/current/tutorial.md
URL: http://svn.apache.org/viewvc/gora/site/trunk/content/current/tutorial.md?rev=1710396&r1=1710395&r2=1710396&view=diff
==============================================================================
--- gora/site/trunk/content/current/tutorial.md (original)
+++ gora/site/trunk/content/current/tutorial.md Sat Oct 24 22:35:23 2015
@@ -975,6 +975,129 @@ The outputs of the job will be saved in
      1236902400000
     1 row(s) in 0.0490 seconds
 
+##Spark Backend
+Log analytics example will be implemented via GoraSparkEngine at this tutorial to explain
Spark backend of Gora.
+Data will be read from Hbase, map/reduce methods will be run and result will be written into
Solr (version: 4.10.3).
+All the process will be done over Spark.
+
+Persist data into Hbase as described at [Log analytics in MapReduce](/current/tutorial.html#log-analytics-in-mapreduce)
+
+To write result into Solr, create a schemaless core named as Metrics. To do it easily, you
can rename default core of collection1 to Metrics which is at
+`solr-4.10.3/example/example-schemaless/solr` folder and edit `solr-4.10.3/example/example-schemaless/solr/Metrics/core.properties`
as follows:
+
+    name=Metrics
+
+Then run start command for Solr:
+
+    solr-4.10.3/example$ java -Dsolr.solr.home=example-schemaless/solr/ -jar start.jar
+
+Read data from Hbase, generate some metrics and write results into Solr with Spark via Gora.
Here is how to initialize in and out data stores:
+
+    public int run(String[] args) throws Exception {
+      DataStore<Long, Pageview> inStore;
+      DataStore<String, MetricDatum> outStore;
+      Configuration hadoopConf = new Configuration();
+      if (args.length > 0) {
+        String dataStoreClass = args[0];
+        inStore = DataStoreFactory.getDataStore(dataStoreClass, Long.class, Pageview.class,
hadoopConf);
+        if (args.length > 1) {
+          dataStoreClass = args[1];
+        }
+        outStore = DataStoreFactory.getDataStore(dataStoreClass, String.class, MetricDatum.class,
hadoopConf);
+        } else {
+          inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, hadoopConf);
+          outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, hadoopConf);
+      }
+     ...
+    }
+
+Pass input data store’s key and value classes and instantiate a GoraSparkEngine:
+
+    GoraSparkEngine<Long, Pageview> goraSparkEngine = new GoraSparkEngine<>(Long.class,
Pageview.class);
+
+Construct a JavaSparkContext. Register input data store’s value class as Kryo class:
+
+    SparkConf sparkConf = new SparkConf().setAppName("Gora Spark Integration Application").setMaster("local");
+    Class[] c = new Class[1];
+    c[0] = inStore.getPersistentClass();
+    sparkConf.registerKryoClasses(c);
+    JavaSparkContext sc = new JavaSparkContext(sparkConf);
+
+You can get JavaPairRDD from input data store:
+
+    JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, inStore);
+
+When you get it, you can work on it as like you are writing a code for Spark! For example:
+
+    long count = goraRDD.count();
+    System.out.println("Total Log Count: " + count);
+
+Here are the functions of map and reduce phases for this example:
+
+    /** The number of milliseconds in a day */
+    private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
+    /**
+    * map function used in calculation
+    */
+    private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>
mapFunc = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>() {
+      @Override
+      public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview) throws
Exception {
+        String url = pageview.getUrl().toString();
+        Long day = getDay(pageview.getTimestamp());
+        Tuple2<String, Long> keyTuple = new Tuple2<>(url, day);
+        return new Tuple2<>(keyTuple, 1L);
+      }
+    };
+
+    /**
+    * reduce function used in calculation
+    */
+    private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long,
Long>() {
+      @Override
+      public Long call(Long aLong, Long aLong2) throws Exception {
+        return aLong + aLong2;
+      }
+    };
+
+    /**
+    * metric function used after map phase
+    */
+    private static PairFunction<Tuple2<Tuple2<String, Long>, Long>, String,
MetricDatum> metricFunc = new PairFunction<Tuple2<Tuple2<String, Long>, Long>,
String, MetricDatum>() {
+      @Override
+      public Tuple2<String, MetricDatum> call(
+        Tuple2<Tuple2<String, Long>, Long> tuple2LongTuple2) throws Exception
{
+        String dimension = tuple2LongTuple2._1()._1();
+        long timestamp = tuple2LongTuple2._1()._2();
+        MetricDatum metricDatum = new MetricDatum();
+        metricDatum.setMetricDimension(dimension);
+        metricDatum.setTimestamp(timestamp);
+        String key = metricDatum.getMetricDimension().toString();
+        key += "_" + Long.toString(timestamp);
+        metricDatum.setMetric(tuple2LongTuple2._2());
+        return new Tuple2<>(key, metricDatum);
+      }
+    };
+
+    /**
+    * Rolls up the given timestamp to the day cardinality, so that data can be
+    * aggregated daily
+    */
+    private static long getDay(long timeStamp) {
+      return (timeStamp / DAY_MILIS) * DAY_MILIS;
+    }
+
+Here is how to run map and reduce functions at existing JavaPairRDD:
+
+    JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD.values().map(mapFunc);
+    JavaPairRDD<String, MetricDatum> reducedGoraRdd = JavaPairRDD.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc).mapToPair(metricFunc);
+
+When you want to persist result into output data store, (in our example it is Solr), you
should do it as follows:
+
+    Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore);
+    reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
+
+That’s all! You can check Solr to verify the results.
+
 ##More Examples
 Other than this tutorial, there are several places that you can find 
 examples of Gora in action.



Mime
View raw message