tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [1/2] TEZ-678. Support for union operations via VertexGroup abstraction (bikas)
Date Fri, 31 Jan 2014 22:26:37 GMT
Updated Branches:
  refs/heads/master f55dbfb81 -> 2e2312646


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
new file mode 100644
index 0000000..8a790fd
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -0,0 +1,501 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.VertexGroup;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.mapreduce.committer.MROutputCommitter;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class UnionExample {
+
+  public static class TokenProcessor implements LogicalIOProcessor {
+    TezProcessorContext context;
+    IntWritable one = new IntWritable(1);
+    Text word = new Text();
+
+    @Override
+    public void initialize(TezProcessorContext processorContext)
+        throws Exception {
+      this.context = processorContext;
+    }
+
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+    }
+
+    @Override
+    public void close() throws Exception {
+    }
+
+    @Override
+    public void run(Map<String, LogicalInput> inputs,
+        Map<String, LogicalOutput> outputs) throws Exception {
+      Preconditions.checkArgument(inputs.size() == 1);
+      boolean inUnion = true;
+      if (context.getTaskVertexName().equals("map3")) {
+        inUnion = false;
+      }
+      Preconditions.checkArgument(outputs.size() == (inUnion ? 2 : 1));
+      Preconditions.checkArgument(outputs.containsKey("checker"));
+      MRInput input = (MRInput) inputs.values().iterator().next();
+      KeyValueReader kvReader = input.getReader();
+      OnFileSortedOutput output = (OnFileSortedOutput) outputs.get("checker");
+      KeyValueWriter kvWriter = output.getWriter();
+      MROutput parts = null;
+      KeyValueWriter partsWriter = null;
+      if (inUnion) {
+        parts = (MROutput) outputs.get("parts");
+        partsWriter = parts.getWriter();
+      }
+      while (kvReader.next()) {
+        StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
+        while (itr.hasMoreTokens()) {
+          word.set(itr.nextToken());
+          kvWriter.write(word, one);
+          if (inUnion) {
+            partsWriter.write(word, one);
+          }
+        }
+      }
+      if (inUnion) {
+        if (parts.isCommitRequired()) {
+          while (!context.canCommit()) {
+            Thread.sleep(100);
+          }
+          parts.commit();
+        }
+      }
+    }
+    
+  }
+  
+  public static class UnionProcessor implements LogicalIOProcessor {
+    TezProcessorContext context;
+    IntWritable one = new IntWritable(1);
+    
+    @Override
+    public void initialize(TezProcessorContext processorContext)
+        throws Exception {
+      this.context = processorContext;
+    }
+
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+    }
+
+    @Override
+    public void close() throws Exception {
+    }
+
+    @Override
+    public void run(Map<String, LogicalInput> inputs,
+        Map<String, LogicalOutput> outputs) throws Exception {
+      Preconditions.checkArgument(inputs.size() == 2);
+      Preconditions.checkArgument(outputs.size() == 2);
+      MROutput out = (MROutput) outputs.get("union");
+      MROutput allParts = (MROutput) outputs.get("all-parts");
+      KeyValueWriter kvWriter = out.getWriter();
+      KeyValueWriter partsWriter = allParts.getWriter();
+      Map<String, AtomicInteger> unionKv = Maps.newHashMap();
+      LogicalInput union = inputs.get("union");
+      KeyValuesReader kvReader = (KeyValuesReader) union.getReader();
+      while (kvReader.next()) {
+        String word = ((Text) kvReader.getCurrentKey()).toString();
+        IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
+        for (int i=0; i<intVal.get(); ++i) {
+          partsWriter.write(word, one);
+        }
+        AtomicInteger value = unionKv.get(word);
+        if (value == null) {
+          unionKv.put(word, new AtomicInteger(intVal.get()));
+        } else {
+          value.addAndGet(intVal.get());
+        }
+      }
+      LogicalInput map3 = inputs.get("map3");
+      kvReader = (KeyValuesReader) map3.getReader();
+      while (kvReader.next()) {
+        String word = ((Text) kvReader.getCurrentKey()).toString();
+        IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
+        AtomicInteger value = unionKv.get(word);
+        if  (value == null) {
+          throw new TezUncheckedException("Expected to exist: " + word);
+        } else {
+          value.getAndAdd(intVal.get()*-2);
+        }
+      }
+      for (AtomicInteger value : unionKv.values()) {
+        if (value.get() != 0) {
+          throw new TezUncheckedException("Unexpected non-zero value");
+        }
+      }
+      kvWriter.write("Union", new IntWritable(unionKv.size()));
+      if (out.isCommitRequired()) {
+        while (!context.canCommit()) {
+          Thread.sleep(100);
+        }
+        out.commit();
+      }
+      if (allParts.isCommitRequired()) {
+        while (!context.canCommit()) {
+          Thread.sleep(100);
+        }
+        allParts.commit();
+      }
+    }
+    
+  }
+  
+  private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
+      Map<String, LocalResource> localResources, Path stagingDir,
+      String inputPath, String outputPath) throws IOException {
+    Configuration mapStageConf = new JobConf((Configuration)tezConf);
+    mapStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        Text.class.getName());
+    mapStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, 
+        TezGroupedSplitsInputFormat.class.getName());
+
+    mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
+    mapStageConf.setBoolean("mapred.mapper.new-api", true);
+
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf,
+        null);
+
+    Configuration finalReduceConf = new JobConf((Configuration)tezConf);
+    finalReduceConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        Text.class.getName());
+    finalReduceConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+        TextOutputFormat.class.getName());
+    finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
+    finalReduceConf.setBoolean("mapred.mapper.new-api", true);
+
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
+        mapStageConf);
+
+    MRHelpers.doJobClientMagic(mapStageConf);
+    MRHelpers.doJobClientMagic(finalReduceConf);
+
+    byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
+    byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload, 
+            TextInputFormat.class.getName());
+    int numMaps = -1;
+    Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
+        TokenProcessor.class.getName()),
+        numMaps, MRHelpers.getMapResource(mapStageConf));
+    mapVertex1.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
+    Map<String, String> mapEnv = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
+    mapVertex1.setTaskEnvironment(mapEnv);
+    Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
+    InputDescriptor id = new InputDescriptor(MRInput.class.getName()).
+        setUserPayload(mapInputPayload);
+    mapVertex1.addInput("MRInput", id, initializerClazz);
+
+    Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
+        TokenProcessor.class.getName()),
+        numMaps, MRHelpers.getMapResource(mapStageConf));
+    mapVertex2.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
+    MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
+    mapVertex2.setTaskEnvironment(mapEnv);
+    mapVertex2.addInput("MRInput", id, initializerClazz);
+
+    Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
+        TokenProcessor.class.getName()),
+        numMaps, MRHelpers.getMapResource(mapStageConf));
+    mapVertex3.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
+    MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
+    mapVertex3.setTaskEnvironment(mapEnv);
+    mapVertex3.addInput("MRInput", id, initializerClazz);
+    
+    byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
+    Vertex checkerVertex = new Vertex("checker",
+        new ProcessorDescriptor(
+            UnionProcessor.class.getName()).setUserPayload(finalReducePayload),
+                1, MRHelpers.getReduceResource(finalReduceConf));
+    checkerVertex.setJavaOpts(
+        MRHelpers.getReduceJavaOpts(finalReduceConf));
+    Map<String, String> reduceEnv = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(finalReduceConf, reduceEnv, false);
+    checkerVertex.setTaskEnvironment(reduceEnv);
+    OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
+      .setUserPayload(finalReducePayload);
+    checkerVertex.addOutput("union", od, MROutputCommitter.class);
+
+    Configuration partsConf = new Configuration(finalReduceConf);
+    partsConf.set(FileOutputFormat.OUTDIR, outputPath+"-parts");
+    byte[] partsPayload = MRHelpers.createUserPayloadFromConf(partsConf);
+    
+    DAG dag = new DAG("UnionExample");
+    
+    VertexGroup unionVertex = dag.createVertexGroup("union", mapVertex1, mapVertex2);
+    OutputDescriptor od1 = new OutputDescriptor(MROutput.class.getName())
+      .setUserPayload(partsPayload);
+    Configuration allPartsConf = new Configuration(finalReduceConf);
+    allPartsConf.set(FileOutputFormat.OUTDIR, outputPath+"-all-parts");
+    byte[] allPartsPayload = MRHelpers.createUserPayloadFromConf(allPartsConf);
+    OutputDescriptor od2 = new OutputDescriptor(MROutput.class.getName())
+      .setUserPayload(allPartsPayload);
+    unionVertex.addOutput("parts", od1, MROutputCommitter.class);
+    checkerVertex.addOutput("all-parts", od2, MROutputCommitter.class);
+    
+    
+    dag.addVertex(mapVertex1)
+        .addVertex(mapVertex2)
+        .addVertex(mapVertex3)
+        .addVertex(checkerVertex)
+        .addEdge(
+            new Edge(mapVertex3, checkerVertex, new EdgeProperty(
+                DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+                SchedulingType.SEQUENTIAL, 
+                new OutputDescriptor(OnFileSortedOutput.class.getName())
+                        .setUserPayload(mapPayload), 
+                new InputDescriptor(ShuffledMergedInput.class.getName())
+                        .setUserPayload(finalReducePayload))))
+        .addEdge(
+            new GroupInputEdge(unionVertex, checkerVertex, new EdgeProperty(
+                DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+                SchedulingType.SEQUENTIAL,
+                new OutputDescriptor(OnFileSortedOutput.class.getName())
+                    .setUserPayload(mapPayload), 
+                new InputDescriptor(ShuffledMergedInput.class.getName())
+                    .setUserPayload(finalReducePayload)),
+                new InputDescriptor(
+                    ConcatenatedMergedKeyValuesInput.class.getName())));
+    return dag;  
+  }
+
+  private static void waitForTezSessionReady(TezSession tezSession)
+      throws IOException, TezException {
+      while (true) {
+        TezSessionStatus status = tezSession.getSessionStatus();
+        if (status.equals(TezSessionStatus.SHUTDOWN)) {
+          throw new RuntimeException("TezSession has already shutdown");
+        }
+        if (status.equals(TezSessionStatus.READY)) {
+          return;
+        }
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
+
+  private static void printUsage() {
+    System.err.println("Usage: " + " unionexample <in1> <out1>");
+  }
+
+  private Credentials credentials = new Credentials();
+  
+  public boolean run(String inputPath, String outputPath, Configuration conf) throws Exception
{
+    System.out.println("Running UnionExample");
+    // conf and UGI
+    TezConfiguration tezConf;
+    if (conf != null) {
+      tezConf = new TezConfiguration(conf);
+    } else {
+      tezConf = new TezConfiguration();
+    }
+    UserGroupInformation.setConfiguration(tezConf);
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    TezClient tezClient = new TezClient(tezConf);
+    ApplicationId appId = tezClient.createApplication();
+    
+    // staging dir
+    FileSystem fs = FileSystem.get(tezConf);
+    String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
+        + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
+        + Path.SEPARATOR + appId.toString();    
+    Path stagingDir = new Path(stagingDirStr);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
+    stagingDir = fs.makeQualified(stagingDir);
+    
+    // security
+    TokenCache.obtainTokensForNamenodes(credentials, new Path[] {stagingDir}, tezConf);
+    TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+
+    tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
+        MRHelpers.getMRAMJavaOpts(tezConf));
+
+    // No need to add jar containing this class as assumed to be part of
+    // the tez jars.
+
+    // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging
dir
+    // is the same filesystem as the one used for Input/Output.
+    
+    TezSession tezSession = null;
+    AMConfiguration amConfig = new AMConfiguration(null,
+        null, tezConf, credentials);
+    
+    TezSessionConfiguration sessionConfig =
+        new TezSessionConfiguration(amConfig, tezConf);
+    tezSession = new TezSession("UnionExampleSession", appId,
+        sessionConfig);
+    tezSession.start();
+
+    DAGStatus dagStatus = null;
+    DAGClient dagClient = null;
+    String[] vNames = { "map1", "map2", "map3", "checker" };
+
+    Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+    try {
+        if (fs.exists(new Path(outputPath))) {
+          throw new FileAlreadyExistsException("Output directory "
+              + outputPath + " already exists");
+        }
+        
+        Map<String, LocalResource> localResources =
+          new TreeMap<String, LocalResource>();
+        
+        DAG dag = createDAG(fs, tezConf, localResources,
+            stagingDir, inputPath, outputPath);
+
+        waitForTezSessionReady(tezSession);
+        dagClient = tezSession.submitDAG(dag);
+        //dagClient = tezClient.submitDAGApplication(dag, amConfig);
+
+        // monitoring
+        while (true) {
+          dagStatus = dagClient.getDAGStatus(statusGetOpts);
+          if(dagStatus.getState() == DAGStatus.State.RUNNING ||
+              dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+              dagStatus.getState() == DAGStatus.State.FAILED ||
+              dagStatus.getState() == DAGStatus.State.KILLED ||
+              dagStatus.getState() == DAGStatus.State.ERROR) {
+            break;
+          }
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+            // continue;
+          }
+        }
+
+
+        while (dagStatus.getState() == DAGStatus.State.RUNNING) {
+          try {
+            ExampleDriver.printDAGStatus(dagClient, vNames);
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException e) {
+              // continue;
+            }
+            dagStatus = dagClient.getDAGStatus(statusGetOpts);
+          } catch (TezException e) {
+            System.exit(-1);
+          }
+        }
+        ExampleDriver.printDAGStatus(dagClient, vNames,
+            true, true);
+        System.out.println("DAG completed. " + "FinalState=" + dagStatus.getState());
+        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+          System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
+          return false;
+        }
+        return true;
+    } finally {
+      fs.delete(stagingDir, true);
+      tezSession.stop();
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    if ((args.length%2) != 0) {
+      printUsage();
+      System.exit(2);
+    }
+    UnionExample job = new UnionExample();
+    job.run(args[0], args[1], null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 7e474c1..d21bd96 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -93,6 +93,7 @@ public class MROutputCommitter extends OutputCommitter {
     committer.abortJob(jobContext, jobState);
   }
 
+  @SuppressWarnings("rawtypes")
   private org.apache.hadoop.mapreduce.OutputCommitter
       getOutputCommitter(OutputCommitterContext context) {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index e0efd45..e0a4c68 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -205,7 +205,7 @@ public class MapUtils {
         vertexName,
         mapProcessorDesc,
         inputSpecs,
-        outputSpecs);
+        outputSpecs, null);
 
     Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
     serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index abefcce..1981872 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -169,7 +169,7 @@ public class TestReduceProcessor {
         reduceVertexName,
         reduceProcessorDesc,
         Collections.singletonList(reduceInputSpec),
-        Collections.singletonList(reduceOutputSpec));
+        Collections.singletonList(reduceOutputSpec), null);
 
     Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
     serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index a624778..cb6287e 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
@@ -41,6 +42,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -49,11 +51,13 @@ import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.Output;
 import org.apache.tez.runtime.api.Processor;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
@@ -67,6 +71,8 @@ import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 @Private
@@ -84,6 +90,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private final ConcurrentHashMap<String, LogicalOutput> outputsMap;
   private final ConcurrentHashMap<String, TezOutputContext> outputContextMap;
   
+  private final List<GroupInputSpec> groupInputSpecs;
+  private ConcurrentHashMap<String, LogicalInput> groupInputsMap;
+  
   private final ProcessorDescriptor processorDescriptor;
   private final LogicalIOProcessor processor;
   private TezProcessorContext processorContext;
@@ -134,6 +143,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
             .setNameFormat("Initializer %d").build());
     this.initializerCompletionService = new ExecutorCompletionService<Void>(
         this.initializerExecutor);
+    this.groupInputSpecs = taskSpec.getGroupInputs();
   }
 
   public void initialize() throws Exception {
@@ -179,14 +189,31 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
     LOG.info("All initializers finished");
 
+    // group inputs depend on inputs beings initialized. So must be done after.
+    initializeGroupInputs();
+    
+    Set<String> groupInputs = Sets.newHashSet();
     // Construct Inputs/Outputs map argument for processor.run()
+    // first add the group inputs
+    if (groupInputSpecs !=null && !groupInputSpecs.isEmpty()) {
+      for (GroupInputSpec groupInputSpec : groupInputSpecs) {
+        runInputMap.put(groupInputSpec.getGroupName(), 
+                                 groupInputsMap.get(groupInputSpec.getGroupName()));
+        groupInputs.addAll(groupInputSpec.getGroupVertices());
+      }
+    }
+    // then add the non-grouped inputs
     for (InputSpec inputSpec : inputSpecs) {
-      LogicalInput input = inputsMap.get(inputSpec.getSourceVertexName());
-      runInputMap.put(inputSpec.getSourceVertexName(), input);
+      if (!groupInputs.contains(inputSpec.getSourceVertexName())) {
+        LogicalInput input = inputsMap.get(inputSpec.getSourceVertexName());
+        runInputMap.put(inputSpec.getSourceVertexName(), input);
+      }
     }
+    
     for (OutputSpec outputSpec : outputSpecs) {
       LogicalOutput output = outputsMap.get(outputSpec.getDestinationVertexName());
-      runOutputMap.put(outputSpec.getDestinationVertexName(), output);
+      String outputName = outputSpec.getDestinationVertexName();
+      runOutputMap.put(outputName, output);
     }
     
     // TODO Maybe close initialized inputs / outputs in case of failure to
@@ -305,6 +332,23 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
   }
 
+  private void initializeGroupInputs() {
+    if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) {
+     groupInputsMap = new ConcurrentHashMap<String, LogicalInput>(groupInputSpecs.size());
+     for (GroupInputSpec groupInputSpec : groupInputSpecs) {
+        LOG.info("Initializing GroupInput using GroupInputSpec: " + groupInputSpec);
+        MergedLogicalInput groupInput = (MergedLogicalInput) createInputFromDescriptor(
+            groupInputSpec.getMergedInputDescriptor());
+        List<Input> inputs = Lists.newArrayListWithCapacity(groupInputSpec.getGroupVertices().size());
+        for (String groupVertex : groupInputSpec.getGroupVertices()) {
+          inputs.add(inputsMap.get(groupVertex));
+        }
+        groupInput.initialize(inputs);
+        groupInputsMap.put(groupInputSpec.getGroupName(), groupInput);
+      }
+    }
+  }
+  
   private void initializeLogicalIOProcessor() throws Exception {
     LOG.info("Initializing processor" + ", processorClassName="
         + processorDescriptor.getClassName());
@@ -349,16 +393,19 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
   private LogicalInput createInput(InputSpec inputSpec) {
     LOG.info("Creating Input");
-    Input input = RuntimeUtils.createClazzInstance(inputSpec
-        .getInputDescriptor().getClassName());
+    return createInputFromDescriptor(inputSpec.getInputDescriptor());
+  }
+
+  private LogicalInput createInputFromDescriptor(InputDescriptor inputDesc) {
+    Input input = RuntimeUtils.createClazzInstance(inputDesc.getClassName());
     if (!(input instanceof LogicalInput)) {
-      throw new TezUncheckedException(input.getClass().getName()
+      throw new TezUncheckedException(inputDesc.getClass().getName()
           + " is not a sub-type of LogicalInput."
           + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
     }
     return (LogicalInput)input;
   }
-
+  
   private LogicalOutput createOutput(OutputSpec outputSpec) {
     LOG.info("Creating Output");
     Output output = RuntimeUtils.createClazzInstance(outputSpec

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/GroupInputSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/GroupInputSpec.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/GroupInputSpec.java
new file mode 100644
index 0000000..83c80bd
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/GroupInputSpec.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.tez.dag.api.InputDescriptor;
+
+import com.google.common.collect.Lists;
+
+public class GroupInputSpec implements Writable{
+
+  private String groupName;
+  private List<String> groupVertices;
+  private InputDescriptor mergedInputDescriptor;
+  
+  public GroupInputSpec() {
+    // for Writable
+  }
+  
+  public String getGroupName() {
+    return groupName;
+  }
+  
+  public List<String> getGroupVertices() {
+    return groupVertices;
+  }
+  
+  public InputDescriptor getMergedInputDescriptor() {
+    return mergedInputDescriptor;
+  }
+  
+  public GroupInputSpec(String groupName, List<String> groupVertices, InputDescriptor
inputDescriptor) {
+    this.groupName = StringInterner.weakIntern(groupName);
+    this.groupVertices = groupVertices;
+    this.mergedInputDescriptor = inputDescriptor;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, groupName);
+    out.writeInt(groupVertices.size());
+    for (String s : groupVertices) {
+      Text.writeString(out, s);
+    }
+    mergedInputDescriptor.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    groupName = StringInterner.weakIntern(Text.readString(in));
+    int numMembers = in.readInt();
+    groupVertices = Lists.newArrayListWithCapacity(numMembers);
+    for (int i=0; i<numMembers; ++i) {
+      groupVertices.add(StringInterner.weakIntern(Text.readString(in)));
+    }
+    mergedInputDescriptor = new InputDescriptor();
+    mergedInputDescriptor.readFields(in);
+  }
+  
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("Group: " + groupName + " { ");
+    for (String s: groupVertices) {
+      sb.append(s + " ");
+    }
+    sb.append("} MergedInputDescriptor: " + mergedInputDescriptor.getClassName());
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
index bc93a70..8752521 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
+import com.google.common.collect.Lists;
+
 public class TaskSpec implements Writable {
 
   private TezTaskAttemptID taskAttemptId;
@@ -35,18 +37,21 @@ public class TaskSpec implements Writable {
   private ProcessorDescriptor processorDescriptor;
   private List<InputSpec> inputSpecList;
   private List<OutputSpec> outputSpecList;
+  private List<GroupInputSpec> groupInputSpecList;
 
   public TaskSpec() {
   }
 
   public TaskSpec(TezTaskAttemptID taskAttemptID,
       String vertexName, ProcessorDescriptor processorDescriptor,
-      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
+      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, 
+      List<GroupInputSpec> groupInputSpecList) {
     this.taskAttemptId = taskAttemptID;
     this.vertexName = StringInterner.weakIntern(vertexName);
     this.processorDescriptor = processorDescriptor;
     this.inputSpecList = inputSpecList;
     this.outputSpecList = outputSpecList;
+    this.groupInputSpecList = groupInputSpecList;
   }
 
   public String getVertexName() {
@@ -68,6 +73,10 @@ public class TaskSpec implements Writable {
   public List<OutputSpec> getOutputs() {
     return outputSpecList;
   }
+  
+  public List<GroupInputSpec> getGroupInputs() {
+    return groupInputSpecList;
+  }
 
   @Override
   public void write(DataOutput out) throws IOException {
@@ -82,12 +91,20 @@ public class TaskSpec implements Writable {
     for (OutputSpec outputSpec : outputSpecList) {
       outputSpec.write(out);
     }
+    if (groupInputSpecList != null && !groupInputSpecList.isEmpty()) {
+      out.writeBoolean(true);
+      out.writeInt(groupInputSpecList.size());
+      for (GroupInputSpec group : groupInputSpecList) {
+        group.write(out);
+      }
+    } else {
+      out.writeBoolean(false);
+    }
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     taskAttemptId = TezTaskAttemptID.readTezTaskAttemptID(in);
-    // TODO ZZZ Intern this.
     vertexName = StringInterner.weakIntern(in.readUTF());
     // TODO TEZ-305 convert this to PB
     processorDescriptor = new ProcessorDescriptor();
@@ -106,6 +123,16 @@ public class TaskSpec implements Writable {
       outputSpec.readFields(in);
       outputSpecList.add(outputSpec);
     }
+    boolean hasGroupInputs = in.readBoolean();
+    if (hasGroupInputs) {
+      int numGroups = in.readInt();
+      groupInputSpecList = Lists.newArrayListWithCapacity(numGroups);
+      for (int i=0; i<numGroups; ++i) {
+        GroupInputSpec group = new GroupInputSpec();
+        group.readFields(in);
+        groupInputSpecList.add(group);
+      }
+    }
   }
 
   @Override
@@ -125,6 +152,13 @@ public class TaskSpec implements Writable {
       sb.append("{" + i.toString() + "}, ");
     }
     sb.append("]");
+    if (groupInputSpecList != null && !groupInputSpecList.isEmpty()) {
+      sb.append(" groupInputSpecList=[");
+      for (GroupInputSpec group : groupInputSpecList) {
+        sb.append("{" + group.toString() + "}, ");
+      }
+      sb.append("]");
+    }
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
new file mode 100644
index 0000000..bc380e5
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
+
+  public class MergedKeyValuesReader implements KeyValuesReader {
+    private int currentReaderIndex = 0;
+    private KeyValuesReader currentReader;
+    
+    @Override
+    public boolean next() throws IOException {
+      while ((currentReader == null) || !currentReader.next()) {
+        if (currentReaderIndex == getInputs().size()) {
+          return false;
+        }
+        try {
+          Reader reader = getInputs().get(currentReaderIndex).getReader();
+          if (!(reader instanceof KeyValuesReader)) {
+            throw new TezUncheckedException("Expected KeyValuesReader. "
+                + "Got: " + reader.getClass().getName());
+          }
+          currentReader = (KeyValuesReader) reader;
+          currentReaderIndex++;
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      return currentReader.getCurrentKey();
+    }
+
+    @Override
+    public Iterable<Object> getCurrentValues() throws IOException {
+      return currentReader.getCurrentValues();
+    }
+    
+  }
+    
+  @Override
+  public Reader getReader() throws Exception {
+    return new MergedKeyValuesReader();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e231264/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index c444cec..990dbe6 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -19,6 +19,7 @@
 package org.apache.tez.mapreduce;
 
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
@@ -26,6 +27,7 @@ import java.util.Random;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -59,6 +61,7 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
@@ -73,6 +76,7 @@ import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepInputFormat;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepMapper;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepReducer;
+import org.apache.tez.mapreduce.examples.UnionExample;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
@@ -530,4 +534,26 @@ public class TestMRRJobsDAGApi {
     return LocalResource.newInstance(resourceURL, type, visibility,
         resourceSize, resourceModificationTime);
   }
+  
+  @Test(timeout = 60000)
+  public void testVertexGroups() throws Exception {
+    LOG.info("Running Group Test");
+    Path inPath = new Path(TEST_ROOT_DIR, "in");
+    Path outPath = new Path(TEST_ROOT_DIR, "out");
+    FSDataOutputStream out = remoteFs.create(inPath);
+    OutputStreamWriter writer = new OutputStreamWriter(out);
+    writer.write("abcd ");
+    writer.write("efgh ");
+    writer.write("abcd ");
+    writer.write("efgh ");
+    writer.close();
+    out.close();
+    
+    UnionExample job = new UnionExample();
+    if (job.run(inPath.toString(), outPath.toString(), mrrTezCluster.getConfig())) {
+      LOG.info("Success VertexGroups Test");
+    } else {
+      throw new TezUncheckedException("VertexGroups Test Failed");
+    }
+  }
 }


Mime
View raw message