gora-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [gora] sneceesay77 commented on a change in pull request #161: GORA-565: Enable Spark in Unit Tests
Date Thu, 18 Apr 2019 18:44:04 GMT
sneceesay77 commented on a change in pull request #161: GORA-565: Enable Spark in Unit Tests
URL: https://github.com/apache/gora/pull/161#discussion_r276787830
 
 

 ##########
 File path: gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
 ##########
 @@ -31,122 +34,151 @@
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Tuple2;
 
-import java.io.IOException;
-import java.util.Map;
+import scala.Tuple2;
 
 /**
  * Classic word count example in Gora with Spark.
  */
 public class SparkWordCount {
-  private static final Logger log = LoggerFactory.getLogger(SparkWordCount.class);
-
-  private static final String USAGE = "SparkWordCount <input_data_store> <output_data_store>";
-
-  /**
-   * map function used in calculation
-   */
-  private static Function<WebPage, Tuple2<String, Long>> mapFunc =
-    new Function<WebPage, Tuple2<String, Long>>() {
-        @Override
-        public Tuple2<String, Long> call(WebPage webPage)
-                throws Exception {
-          String content = new String(webPage.getContent().array(), Charset.defaultCharset());
-          return new Tuple2<>(content, 1L);
-        }
-  };
-
-  /**
-   * reduce function used in calculation
-   */
-  private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long,
Long>() {
-    @Override
-    public Long call(Long aLong, Long aLong2) throws Exception {
-      return aLong + aLong2;
-    }
-  };
-
-  public int wordCount(DataStore<String,WebPage> inStore,
-    DataStore<String, TokenDatum> outStore) throws IOException {
-
-    //Spark engine initialization
-    GoraSparkEngine<String, WebPage> goraSparkEngine = new GoraSparkEngine<>(String.class,
-       WebPage.class);
-
-    SparkConf sparkConf = new SparkConf().setAppName(
-      "Gora Spark Word Count Application").setMaster("local");
-
-    Class[] c = new Class[1];
-    c[0] = inStore.getPersistentClass();
-    sparkConf.registerKryoClasses(c);
-    //
-    JavaSparkContext sc = new JavaSparkContext(sparkConf);
-
-    JavaPairRDD<String, WebPage> goraRDD = goraSparkEngine.initialize(sc, inStore);
-
-    long count = goraRDD.count();
-    log.info("Total Web page count: {}", count);
-
-    JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(mapFunc);
-
-    JavaPairRDD<String, Long> reducedGoraRdd = JavaPairRDD.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc);
-
-    //Print output for debug purpose
-    log.info("SparkWordCount debug purpose TokenDatum print starts:");
-    Map<String, Long> tokenDatumMap = reducedGoraRdd.collectAsMap();
-    for (String key : tokenDatumMap.keySet()) {
-      log.info(key);
-      log.info(tokenDatumMap.get(key).toString());
-    }
-    log.info("SparkWordCount debug purpose TokenDatum print ends:");
-    //
-
-    //write output to datastore
-    Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore);
-    reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
-    //
-
-    return 1;
-  }
-
-  public int run(String[] args) throws Exception {
-
-    DataStore<String,WebPage> inStore;
-    DataStore<String, TokenDatum> outStore;
-    Configuration hadoopConf = new Configuration();
-    if(args.length > 0) {
-      String dataStoreClass = args[0];
-      inStore = DataStoreFactory.getDataStore(dataStoreClass, String.class, WebPage.class,
hadoopConf);
-      if(args.length > 1) {
-        dataStoreClass = args[1];
-      }
-      outStore = DataStoreFactory.getDataStore(dataStoreClass,
-        String.class, TokenDatum.class, hadoopConf);
-      } else {
-        inStore = DataStoreFactory.getDataStore(String.class, WebPage.class, hadoopConf);
-        outStore = DataStoreFactory.getDataStore(String.class, TokenDatum.class, hadoopConf);
-      }
-
-      return wordCount(inStore, outStore);
-  }
-
-  public static void main(String[] args) throws Exception {
-
-    if (args.length < 2) {
-      log.info(USAGE);
-      System.exit(1);
-    }
-
-    SparkWordCount sparkWordCount = new SparkWordCount();
-
-    try {
-      int ret = sparkWordCount.run(args);
-      System.exit(ret);
-    } catch (Exception ex){
-      log.error("Error occurred!");
-    }
-  }
+	private static final Logger log = LoggerFactory.getLogger(SparkWordCount.class);
+
+	private static final String USAGE = "SparkWordCount <input_data_store> <output_data_store>";
+
+	/**
+	 * This method would flattened WebPage data and return an Iterable list of
+	 * words. The map Function would use this as an input.
+	 */
+	private static Function<WebPage, Iterable<String>> flatMapFun = new Function<WebPage,
Iterable<String>>() {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Iterable<String> call(WebPage v1) throws Exception {
+			String content = new String(v1.getContent().array(), Charset.defaultCharset());
+			return Arrays.asList(content.split(" "));
+		}
+	};
+
+	/**
+	 * Map function used to map out each word with a count of 1
+	 */
+	private static Function<String, Tuple2<String, Long>> mapFunc = new Function<String,
Tuple2<String, Long>>() {
+		@Override
+		public Tuple2<String, Long> call(String s) throws Exception {
+			return new Tuple2<>(s, 1L);
+		}
+	};
+
+	/**
+	 * reduce function used in calculation
+	 */
+	private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long,
Long>() {
+		@Override
+		public Long call(Long aLong, Long aLong2) throws Exception {
+			return aLong + aLong2;
+		}
+	};
+
+	/**
+	 * Convert the key value pair <String, Long> to <String, TokenDatum> as per
the specification in the mapping file
+	 */
+	private static PairFunction<Tuple2<String, Long>, String, TokenDatum> metricFunc
= new PairFunction<Tuple2<String, Long>, String, TokenDatum>() {
+		@Override
+		public Tuple2<String, TokenDatum> call(Tuple2<String, Long> line) throws Exception
{
+			String word = line._1();
+			TokenDatum tDatum = new TokenDatum();
+			tDatum.setCount(line._2.intValue());
+			return new Tuple2<>(word, tDatum);
+		}
+	};
+
+	public int wordCount(DataStore<String, WebPage> inStore, DataStore<String, TokenDatum>
outStore)
+			throws IOException {
+
+		// Spark engine initialization
+		GoraSparkEngine<String, WebPage> goraSparkEngine = new GoraSparkEngine<>(String.class,
WebPage.class);
+
+		SparkConf sparkConf = new SparkConf().setAppName("Gora Spark Word Count Application").setMaster("local");
+
+		Class[] c = new Class[1];
+		c[0] = inStore.getPersistentClass();
+		sparkConf.registerKryoClasses(c);
+		//
+		JavaSparkContext sc = new JavaSparkContext(sparkConf);
+
+		JavaPairRDD<String, WebPage> goraRDD = goraSparkEngine.initialize(sc, inStore);
+
+		JavaPairRDD<String, String> goraRDDFlatenned = goraRDD.flatMapValues(flatMapFun);
+
+		// Map<String, WebPage> debug = goraRDD.collectAsMap();
+		// for(WebPage k: debug.values()){
+		// System.out.println(k);
+		// //System.out.println(debug.get(k).getUrl());
+		// }
+
+		long count = goraRDD.count();
+		log.info("Total Web page count: {}", count);
+
+		JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDDFlatenned.values().map(mapFunc);
+		System.out.println(mappedGoraRdd.collect());
+
+		JavaPairRDD<String, TokenDatum> reducedGoraRdd = JavaPairRDD.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc)
+				.mapToPair(metricFunc);
+
+		// Print output for debug purpose
+		log.info("SparkWordCount debug purpose TokenDatum print starts:");
+		Map<String, TokenDatum> tokenDatumMap = reducedGoraRdd.collectAsMap();
+		for (String key : tokenDatumMap.keySet()) {
+			log.info(key);
+			log.info(tokenDatumMap.get(key).toString());
+		}
+		log.info("SparkWordCount debug purpose TokenDatum print ends:");
+
+		// write output to datastore
+		System.out.println(reducedGoraRdd.collect());
+		Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore);
+		reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
+		System.out.println(outStore.get("d"));
 
 Review comment:
   all println() debug messages removed. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message