tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject svn commit: r1471138 - in /incubator/tez/branches/TEZ-1: tez-engine-api/src/main/java/org/apache/tez/engine/api/ tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ tez-en...
Date Tue, 23 Apr 2013 20:52:27 GMT
Author: hitesh
Date: Tue Apr 23 20:52:27 2013
New Revision: 1471138

URL: http://svn.apache.org/r1471138
Log:
TEZ-73. Change task engine apis to allow multiple inputs and outputs. (hitesh)

Modified:
    incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java
    incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java

Modified: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java?rev=1471138&r1=1471137&r2=1471138&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java
Tue Apr 23 20:52:27 2013
@@ -48,7 +48,7 @@ public interface Processor {
    * @throws IOException
    * @throws InterruptedException
    */
-  public void process(Input in, Output  out) 
+  public void process(Input[] in, Output[]  out)
       throws IOException, InterruptedException;
 
   /**

Modified: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java?rev=1471138&r1=1471137&r2=1471138&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
Tue Apr 23 20:52:27 2013
@@ -44,19 +44,19 @@ public interface Task {
    * Get {@link Input} of the task.
    * @return <code>Input</code> of the task
    */
-  public Input getInput();
+  public Input[] getInputs();
 
   /**
-   * Get {@link Processor} of the task.
-   * @return <code>Processor</code> of the task
+   * Get {@link Processor}s of the task.
+   * @return <code>Processor</code>s of the task
    */
   public Processor getProcessor();
 
   /**
-   * Get {@link Output} of the task.
-   * @return <code>Output</code> of the task
+   * Get {@link Output}s of the task.
+   * @return <code>Output</code>s of the task
    */
-  public Output getOutput();
+  public Output[] getOutputs();
 
   /**
    * Run the {@link Task}.

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java?rev=1471138&r1=1471137&r2=1471138&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
Tue Apr 23 20:52:27 2013
@@ -45,6 +45,8 @@ import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezTaskReporter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
 import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.combine.CombineInput;
@@ -372,7 +374,8 @@ public class MergeManager {
     CombineOutput combineOut = new CombineOutput(writer);
     combineOut.initialize(conf, reporter);
     
-    combineProcessor.process(combineIn, combineOut);
+    combineProcessor.process(new Input[] {combineIn},
+        new Output[] {combineOut});
     
     combineIn.close();
     combineOut.close();

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java?rev=1471138&r1=1471137&r2=1471138&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
Tue Apr 23 20:52:27 2013
@@ -44,7 +44,9 @@ import org.apache.tez.common.RunningTask
 import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Input;
 import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
 import org.apache.tez.engine.api.Partitioner;
 import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.common.ConfigUtils;
@@ -189,7 +191,8 @@ public abstract class ExternalSorter {
     CombineOutput combineOut = new CombineOutput(writer);
     combineOut.initialize(job, runningTaskContext.getTaskReporter());
 
-    combineProcessor.process(combineIn, combineOut);
+    combineProcessor.process(new Input[] {combineIn},
+        new Output[] {combineOut});
 
     combineIn.close();
     combineOut.close();

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java?rev=1471138&r1=1471137&r2=1471138&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
Tue Apr 23 20:52:27 2013
@@ -32,8 +32,8 @@ import com.google.inject.assistedinject.
 public class RuntimeTask 
 implements Task {
 
-  private final Input in;
-  private final Output out;
+  private final Input[] inputs;
+  private final Output[] outputs;
   private final Processor processor;
   
   private Configuration conf;
@@ -41,12 +41,21 @@ implements Task {
   
   @Inject
   public RuntimeTask(
-      @Assisted Processor processor, 
-      @Assisted Input in, 
+      @Assisted Processor processor,
+      @Assisted Input in,
       @Assisted Output out) {
-    this.in = in;
+    this(processor,
+        new Input[] {in},
+        new Output[] {out});
+  }
+
+  public RuntimeTask(
+      Processor processor,
+      Input[] inputs,
+      Output[] outputs) {
+    this.inputs = inputs;
     this.processor = processor;
-    this.out = out;
+    this.outputs = outputs;
   }
 
   public void initialize(Configuration conf, Master master) throws IOException,
@@ -59,8 +68,8 @@ implements Task {
   }
 
   @Override
-  public Input getInput() {
-    return in;
+  public Input[] getInputs() {
+    return inputs;
   }
 
   @Override
@@ -69,12 +78,12 @@ implements Task {
   }
 
   @Override
-  public Output getOutput() {
-    return out;
+  public Output[] getOutputs() {
+    return outputs;
   }
 
   public void run() throws IOException, InterruptedException {
-    processor.process(in, out);
+    processor.process(inputs, outputs);
   }
 
   public void close() throws IOException, InterruptedException {

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java?rev=1471138&r1=1471137&r2=1471138&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
Tue Apr 23 20:52:27 2013
@@ -84,14 +84,23 @@ public class MapProcessor extends MRTask
 
   @Override
   public void process(
-      final Input in, 
-      final Output out)
+      final Input[] ins,
+      final Output[] outs)
           throws IOException, InterruptedException {
     MRTaskReporter reporter = new MRTaskReporter(getTaskReporter());
     boolean useNewApi = jobConf.getUseNewMapper();
     initTask(jobConf, taskAttemptId.getTaskID().getVertexID().getDAGId(),
         reporter, useNewApi);
 
+    if (ins.length != 1
+        || outs.length != 1) {
+      throw new IOException("Cannot handle multiple inputs or outputs"
+          + ", inputCount=" + ins.length
+          + ", outputCount=" + outs.length);
+    }
+    Input in = ins[0];
+    Output out = outs[0];
+
     if (in instanceof SimpleInput) {
       ((SimpleInput)in).setTask(this);
     }

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java?rev=1471138&r1=1471137&r2=1471138&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
Tue Apr 23 20:52:27 2013
@@ -95,13 +95,22 @@ implements Processor {
   }
 
   @Override
-  public void process(Input in, Output out)
+  public void process(Input[] ins, Output[] outs)
       throws IOException, InterruptedException {
     MRTaskReporter reporter = new MRTaskReporter(getTaskReporter());
     boolean useNewApi = jobConf.getUseNewMapper();
     initTask(jobConf, taskAttemptId.getTaskID().getVertexID().getDAGId(),
         reporter, useNewApi);
 
+    if (ins.length != 1
+        || outs.length != 1) {
+      throw new IOException("Cannot handle multiple inputs or outputs"
+          + ", inputCount=" + ins.length
+          + ", outputCount=" + outs.length);
+    }
+    Input in = ins[0];
+    Output out = outs[0];
+
     if (in instanceof SimpleInput) {
       ((SimpleInput)in).setTask(this);
     } else if (in instanceof ShuffledMergedInput) {

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java?rev=1471138&r1=1471137&r2=1471138&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java
Tue Apr 23 20:52:27 2013
@@ -100,8 +100,8 @@ public class TestTaskModules {
 
   static class TestTask implements Task {
 
-    private final Input in;
-    private final Output out;
+    private final Input[] ins;
+    private final Output[] outs;
     private final Processor processor;
     
     @Inject
@@ -109,27 +109,36 @@ public class TestTaskModules {
         @Assisted Processor processor, 
         @Assisted Input in, 
         @Assisted Output out) {
-      this.in = in;
+      this(processor, new Input[] {in},
+          new Output[] {out});
+    }
+
+    @Inject
+    public TestTask(
+        Processor processor,
+        Input[] ins,
+        Output[] outs) {
+      this.ins = ins;
       this.processor = processor;
-      this.out = out;
+      this.outs = outs;
     }
-    
+
     @Override
     public void initialize(Configuration conf, Master master)
         throws IOException, InterruptedException {
-      LOG.info("in = " + in.getClass());
+      LOG.info("in = " + ins[0].getClass());
       LOG.info("processor = " + processor.getClass());
-      LOG.info("out = " + out.getClass());
+      LOG.info("out = " + outs[0].getClass());
     }
 
     @Override
-    public Input getInput() {
-      return in;
+    public Input[] getInputs() {
+      return ins;
     }
 
     @Override
-    public Output getOutput() {
-      return out;
+    public Output[] getOutputs() {
+      return outs;
     }
 
     @Override

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java?rev=1471138&r1=1471137&r2=1471138&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
Tue Apr 23 20:52:27 2013
@@ -106,10 +106,10 @@ public class MapUtils {
     TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
     Task t = factory.createTask(taskContext);
     t.initialize(jobConf, umbilical);
-    SimpleInput real = ((SimpleInput)t.getInput());
-    SimpleInput in = spy(real);
-    doReturn(split).when(in).getOldSplitDetails(any(TaskSplitIndex.class));
-    t.getProcessor().process(in, t.getOutput());
+    SimpleInput[] real = ((SimpleInput[])t.getInputs());
+    SimpleInput[] inputs = spy(real);
+    doReturn(split).when(inputs[0]).getOldSplitDetails(any(TaskSplitIndex.class));
+    t.getProcessor().process(inputs, t.getOutputs());
     return t;
   }
 

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java?rev=1471138&r1=1471137&r2=1471138&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
Tue Apr 23 20:52:27 2013
@@ -20,8 +20,6 @@ package org.apache.tez.mapreduce.process
 
 import java.io.IOException;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,6 +43,7 @@ import org.jboss.netty.buffer.BigEndianH
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.TruncatedChannelBuffer;
 import org.jboss.netty.handler.stream.ChunkedStream;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -125,16 +124,16 @@ public class TestMapProcessor {
         MapUtils.runMapProcessor(
             localFs, workDir, job, 0, new Path(workDir, "map0"), 
         new InitialTaskWithInMemSort(), new TestUmbilicalProtocol(true));
-    InMemorySortedOutput output = (InMemorySortedOutput)t.getOutput();
+    InMemorySortedOutput[] outputs = (InMemorySortedOutput[])t.getOutputs();
     
-    verifyInMemSortedStream(output, 0, 4096);
+    verifyInMemSortedStream(outputs[0], 0, 4096);
     int i = 0;
     for (i = 2; i < 256; i <<= 1) {
-      verifyInMemSortedStream(output, 0, i);
+      verifyInMemSortedStream(outputs[0], 0, i);
     }
-    verifyInMemSortedStream(output, 1, 4096);
+    verifyInMemSortedStream(outputs[0], 1, 4096);
     for (i = 2; i < 256; i <<= 1) {
-      verifyInMemSortedStream(output, 1, i);
+      verifyInMemSortedStream(outputs[0], 1, i);
     }
 
     t.close();



Mime
View raw message