tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [1/2] TEZ-1465. Update and document IntersectExample. Change name to JoinExample (bikas)
Date Wed, 20 Aug 2014 00:09:58 GMT
Repository: tez
Updated Branches:
  refs/heads/master fbca9f4c0 -> b5497d7c1


http://git-wip-us.apache.org/repos/asf/tez/blob/b5497d7c/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
index 52cff91..ba69077 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -59,6 +60,11 @@ import org.apache.tez.runtime.library.processor.SimpleProcessor;
  * of occurrences of a word in a distributed text data set.
  */
 public class WordCount extends Configured implements Tool {
+
+  static String INPUT = "Input";
+  static String OUTPUT = "Output";
+  static String TOKENIZER = "Tokenizer";
+  static String SUMMATION = "Summation";
   
   /*
    * Example code to write a processor in Tez.
@@ -84,8 +90,9 @@ public class WordCount extends Configured implements Tool {
       // of casting the input/output. This allows the actual input/output type to be replaced
       // without affecting the semantic guarantees of the data type that are represented
by
       // the reader and writer.
-      KeyValueReader kvReader = (KeyValueReader) getInputs().values().iterator().next().getReader();
-      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().values().iterator().next().getWriter();
+      // The inputs/outputs are referenced via the names assigned in the DAG.
+      KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
+      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(SUMMATION).getWriter();
       while (kvReader.next()) {
         StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
         while (itr.hasMoreTokens()) {
@@ -116,11 +123,11 @@ public class WordCount extends Configured implements Tool {
     public void run() throws Exception {
       Preconditions.checkArgument(getInputs().size() == 1);
       Preconditions.checkArgument(getOutputs().size() == 1);
-      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().values().iterator().next().getWriter();
+      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
       // The KeyValues reader provides all values for a given key. The aggregation of values
per key
       // is done by the LogicalInput. Since the key is the word and the values are its counts
in 
       // the different TokenProcessors, summing all values per key provides the sum for that
word.
-      KeyValuesReader kvReader = (KeyValuesReader) getInputs().values().iterator().next().getReader();
+      KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(TOKENIZER).getReader();
       while (kvReader.next()) {
         Text word = (Text) kvReader.getCurrentKey();
         int sum = 0;
@@ -150,8 +157,8 @@ public class WordCount extends Configured implements Tool {
     // Create a vertex that reads the data from the data source and tokenizes it using the

     // TokenProcessor. The number of tasks that will do the work for this vertex will be
decided 
     // using the information provided by the data source descriptor.
-    Vertex tokenizerVertex = Vertex.create("Tokenizer", ProcessorDescriptor.create(
-        TokenProcessor.class.getName())).addDataSource("Input", dataSource);
+    Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create(
+        TokenProcessor.class.getName())).addDataSource(INPUT, dataSource);
 
     // Create the edge that represents the movement and semantics of data between the producer

     // Tokenizer vertex and the consumer Summation vertex. In order to perform the summation
in 
@@ -172,9 +179,9 @@ public class WordCount extends Configured implements Tool {
     // The number of tasks that do the work of this vertex depends on the number of partitions
used 
     // to distribute the sum processing. In this case, its been made configurable via the

     // numPartitions parameter.
-    Vertex summationVertex = Vertex.create("Summation",
+    Vertex summationVertex = Vertex.create(SUMMATION,
         ProcessorDescriptor.create(SumProcessor.class.getName()), numPartitions)
-        .addDataSink("Output", dataSink);
+        .addDataSink(OUTPUT, dataSink);
 
     // No need to add jar containing this class as assumed to be part of the Tez jars. Otherwise

     // we would have to add the jars for this code as local files to the vertices.
@@ -202,6 +209,8 @@ public class WordCount extends Configured implements Tool {
     } else {
       tezConf = new TezConfiguration();
     }
+    
+    UserGroupInformation.setConfiguration(tezConf);
 
     // Create the TezClient to submit the DAG. Pass the tezConf that has all necessary global
and 
     // dag specific configurations

http://git-wip-us.apache.org/repos/asf/tez/blob/b5497d7c/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 736c54b..1fbacdf 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -51,9 +51,9 @@ import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.examples.OrderedWordCount;
 import org.apache.tez.examples.SimpleSessionExample;
-import org.apache.tez.examples.IntersectDataGen;
-import org.apache.tez.examples.IntersectExample;
-import org.apache.tez.examples.IntersectValidate;
+import org.apache.tez.examples.JoinDataGen;
+import org.apache.tez.examples.JoinExample;
+import org.apache.tez.examples.JoinValidate;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -118,7 +118,7 @@ public class TestTezJobs {
 
   @Test(timeout = 60000)
   public void testIntersectExample() throws Exception {
-    IntersectExample intersectExample = new IntersectExample();
+    JoinExample intersectExample = new JoinExample();
     intersectExample.setConf(new Configuration(mrrTezCluster.getConfig()));
     Path stagingDirPath = new Path("/tmp/tez-staging-dir");
     Path inPath1 = new Path("/tmp/inPath1");
@@ -192,18 +192,18 @@ public class TestTezJobs {
       tezSession = TezClient.create("IntersectExampleSession", tezConf);
       tezSession.start();
 
-      IntersectDataGen dataGen = new IntersectDataGen();
+      JoinDataGen dataGen = new JoinDataGen();
       String[] dataGenArgs = new String[] {
           dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
           expectedOutputPath.toString(), "2" };
       assertEquals(0, dataGen.run(tezConf, dataGenArgs, tezSession));
 
-      IntersectExample intersect = new IntersectExample();
+      JoinExample intersect = new JoinExample();
       String[] intersectArgs = new String[] {
           dataPath1.toString(), dataPath2.toString(), "2", outPath.toString() };
       assertEquals(0, intersect.run(tezConf, intersectArgs, tezSession));
 
-      IntersectValidate intersectValidate = new IntersectValidate();
+      JoinValidate intersectValidate = new JoinValidate();
       String[] intersectValidateArgs = new String[] {
           expectedOutputPath.toString(), outPath.toString(), "3" };
       assertEquals(0, intersectValidate.run(tezConf, intersectValidateArgs, tezSession));


Mime
View raw message