Return-Path: X-Original-To: apmail-gora-commits-archive@www.apache.org Delivered-To: apmail-gora-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6D09510612 for ; Thu, 3 Sep 2015 07:28:38 +0000 (UTC) Received: (qmail 21888 invoked by uid 500); 3 Sep 2015 07:28:38 -0000 Delivered-To: apmail-gora-commits-archive@gora.apache.org Received: (qmail 21811 invoked by uid 500); 3 Sep 2015 07:28:38 -0000 Mailing-List: contact commits-help@gora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gora.apache.org Delivered-To: mailing list commits@gora.apache.org Received: (qmail 21768 invoked by uid 99); 3 Sep 2015 07:28:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Sep 2015 07:28:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0ED89E0061; Thu, 3 Sep 2015 07:28:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lewismc@apache.org To: commits@gora.apache.org Date: Thu, 03 Sep 2015 07:28:38 -0000 Message-Id: <27d53359ddce4ddd84fe8892584ea4e6@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/25] gora git commit: * GoraSpark.java initialize method renamed to initializeInput. * Architectural change is made. * GoraSpark.java initialize method renamed to initializeInput. * Architectural change is made. Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ef68cead Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ef68cead Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ef68cead Branch: refs/heads/master Commit: ef68cead273324797cf292dbe6da18ee3fd819cb Parents: 9c2d225 Author: Furkan KAMACI Authored: Sun Jun 28 17:17:52 2015 +0300 Committer: Furkan KAMACI Committed: Sun Jun 28 17:17:52 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/gora/spark/GoraSpark.java | 50 +++++++++++++++----- .../gora/tutorial/log/LogAnalyticsSpark.java | 44 ++++++++--------- 2 files changed, 60 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/ef68cead/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java ---------------------------------------------------------------------- diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java index 690e32c..4279cfb 100644 --- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java +++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java @@ -24,6 +24,7 @@ import org.apache.gora.mapreduce.GoraMapReduceUtils; import org.apache.gora.persistency.Persistent; import org.apache.gora.store.DataStore; import org.apache.gora.util.IOUtils; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -37,19 +38,44 @@ public class GoraSpark { public GoraSpark(Class clazzK, Class clazzV) { this.clazzK = clazzK; - this.clazzV = clazzV; + this.clazzV = clazzV; } - public JavaPairRDD initialize(JavaSparkContext sparkContext, + public JavaPairRDD initializeInput(JavaSparkContext sparkContext, Configuration conf, DataStore dataStore) { - GoraMapReduceUtils.setIOSerializations(conf, true); - try { - IOUtils.storeToConf(dataStore.newQuery(), conf, - GoraInputFormat.QUERY_KEY); - } catch (IOException ioex) { - throw new RuntimeException(ioex.getMessage()); - } - return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, - clazzK, clazzV); - } + GoraMapReduceUtils.setIOSerializations(conf, true); + + try { + IOUtils + .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY); + } catch (IOException ioex) { + throw new RuntimeException(ioex.getMessage()); + } + + return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK, + clazzV); + } + + public JavaPairRDD initializeInput(JavaSparkContext sparkContext, + DataStore dataStore) { + Configuration hadoopConf; + + if ((dataStore instanceof Configurable) && ((Configurable) dataStore).getConf() != null) { + hadoopConf = ((Configurable) dataStore).getConf(); + } else { + hadoopConf = new Configuration(); + } + + GoraMapReduceUtils.setIOSerializations(hadoopConf, true); + + try { + IOUtils.storeToConf(dataStore.newQuery(), hadoopConf, + GoraInputFormat.QUERY_KEY); + } catch (IOException ioex) { + throw new RuntimeException(ioex.getMessage()); + } + + return sparkContext.newAPIHadoopRDD(hadoopConf, GoraInputFormat.class, + clazzK, clazzV); + } } http://git-wip-us.apache.org/repos/asf/gora/blob/ef68cead/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java ---------------------------------------------------------------------- diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java index 214b130..9b28fd7 100644 --- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java +++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java @@ -21,44 +21,44 @@ import org.apache.gora.spark.GoraSpark; import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreFactory; import org.apache.gora.tutorial.log.generated.Pageview; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.conf.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -public class LogAnalyticsSpark extends Configured implements Tool { +public class LogAnalyticsSpark { private static final String USAGE = "LogAnalyticsSpark "; - private static LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark(); public static void main(String[] args) throws Exception { - if (args.length < 2) { - System.err.println(USAGE); - System.exit(1); + if (args.length < 2) { + System.err.println(USAGE); + System.exit(1); } - // run as any other MR job - int ret = ToolRunner.run(logAnalyticsSpark, args); - System.exit(ret); + + String inStoreClass = args[0]; + String outStoreClass = args[1]; + + LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark(); + int ret = logAnalyticsSpark.run(inStoreClass, outStoreClass); + + System.exit(ret); } - @Override - public int run(String[] args) throws Exception { - GoraSpark goraSpark = new GoraSpark( - Long.class, Pageview.class); + public int run(String inStoreClass, String outStoreClass) throws Exception { + GoraSpark goraSpark = new GoraSpark<>(Long.class, + Pageview.class); - SparkConf conf = new SparkConf().setAppName( + SparkConf sparkConf = new SparkConf().setAppName( "Gora Integration Application").setMaster("local"); - JavaSparkContext sc = new JavaSparkContext(conf); + JavaSparkContext sc = new JavaSparkContext(sparkConf); + + Configuration hadoopConf = new Configuration(); - String dataStoreClass = args[0]; DataStore dataStore = DataStoreFactory.getDataStore( - dataStoreClass, Long.class, Pageview.class, - logAnalyticsSpark.getConf()); + inStoreClass, Long.class, Pageview.class, hadoopConf); - JavaPairRDD goraRDD = goraSpark - .initialize(sc, logAnalyticsSpark.getConf(), dataStore); + JavaPairRDD goraRDD = goraSpark.initializeInput(sc, dataStore); // JavaPairRDD // cachedGoraRdd = goraRDD.cache();