tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject svn commit: r1471799 - in /incubator/tez/branches/TEZ-1: tez-dag-api/src/main/java/org/apache/tez/dag/api/ tez-dag/src/main/java/org/apache/hadoop/mapred/ tez-engine/src/main/java/org/apache/tez/common/ tez-engine/src/main/java/org/apache/tez/engine/ru...
Date Thu, 25 Apr 2013 01:03:38 GMT
Author: hitesh
Date: Thu Apr 25 01:03:37 2013
New Revision: 1471799

URL: http://svn.apache.org/r1471799
Log:
TEZ-82. Using Processor/InputSpec/OutputSpec to create the runtime task. (hitesh)

Added:
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
  (with props)
Removed:
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TaskFactory.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java
Modified:
    incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
    incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java

Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java?rev=1471799&r1=1471798&r2=1471799&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
Thu Apr 25 01:03:37 2013
@@ -50,8 +50,8 @@ public class EdgeProperty { // FIXME ren
   public EdgeProperty() {
     this.connectionPattern = ConnectionPattern.BIPARTITE;
     this.sourceType = SourceType.STABLE;
-    this.inputClass = "ShuffleInput.class"; //FIXME
-    this.outputClass = "SortedMapOutputBuffer.class"; //FIXME
+    this.inputClass = "org.apache.tez.engine.lib.input.ShuffledMergedInput";
+    this.outputClass = "org.apache.tez.engine.lib.output.OnFileSortedOutput";
   }
   
   public ConnectionPattern getConnectionPattern() {

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java?rev=1471799&r1=1471798&r2=1471799&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
Thu Apr 25 01:03:37 2013
@@ -24,11 +24,14 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
+import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.crypto.SecretKey;
 
@@ -55,7 +58,6 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
@@ -66,23 +68,26 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.LogManager;
 import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.records.TezTaskAttemptID;
-import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.RuntimeUtils;
+import org.apache.tez.engine.task.RuntimeTask;
 import org.apache.tez.mapreduce.hadoop.ContainerContext;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
 import org.apache.tez.mapreduce.processor.MRTask;
 
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
 /**
  * The main() for TEZ MapReduce task processes.
  */
@@ -424,32 +429,24 @@ public class YarnTezDagChild {
     // submit configuration parameters to the AM and effectively tasks via RPC.
 
     // TODO Avoid all this extra config manipulation.
+    // FIXME we need I/O/p level configs to be used in init below
     final JobConf job = new JobConf(conf);
     job.setCredentials(credentials);
-    
-    // Create the appropriate guice task-module
-    AbstractModule taskModule = null;
-    LOG.info("Using Module: " + taskContext.getTaskModuleClassName());
-    try {
-      Class<?> moduleClazz = Class
-          .forName(taskContext.getTaskModuleClassName());
-      if (AbstractModule.class.isAssignableFrom(moduleClazz)) {
-        taskModule = (AbstractModule) ReflectionUtils.newInstance(moduleClazz,
-            job);
-      } else {
-        throw new YarnException("Module class: " + moduleClazz.getName()
-            + " should be an instance of "
-            + AbstractModule.class.getCanonicalName());
-      }
-    } catch (ClassNotFoundException e) {
-      throw new YarnException("Unable to load moduleClass: "
-          + taskContext.getTaskModuleClassName(), e);
-    }
 
-    // Use the injector to create & bind input, processor, output & task
-    Injector injector = Guice.createInjector(taskModule);
-    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
-    Task t = factory.createTask(taskContext);
+    // FIXME need Input/Output vertices else we have this hack
+    if (taskContext.getInputSpecList().isEmpty()) {
+      taskContext.getInputSpecList().add(
+          new InputSpec("null", 0, SimpleInput.class.getName()));
+    }
+    if (taskContext.getOutputSpecList().isEmpty()) {
+      taskContext.getOutputSpecList().add(
+          new OutputSpec("null", 0, SimpleOutput.class.getName()));
+    }
+    Task t = RuntimeUtils.createRuntimeTask(taskContext);
+    
+    // FIXME wrapper should initialize all of processor, inputs and outputs
+    // Currently, processor is inited via task init
+    // and processor then inits inputs and outputs
     t.initialize(job, master);
     
     MRTask task = (MRTask)t.getProcessor();

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java?rev=1471799&r1=1471798&r2=1471799&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
Thu Apr 25 01:03:37 2013
@@ -32,7 +32,7 @@ public class TezEngineTaskContext extend
   // These two could be replaced by a TezConfiguration / DagSpec.
   private List<InputSpec> inputSpecList;
   private List<OutputSpec> outputSpecList;
-  private String taskModuleClassName;
+  private String processorName;
   
   public TezEngineTaskContext() {
     super();
@@ -52,11 +52,11 @@ public class TezEngineTaskContext extend
     }
     this.inputSpecList = inputSpecList;
     this.outputSpecList = outputSpecList;
-    this.taskModuleClassName = moduleClassName;
+    this.processorName = moduleClassName;
   }
 
-  public String getTaskModuleClassName() {
-    return taskModuleClassName;
+  public String getProcessorName() {
+    return processorName;
   }
   
   public List<InputSpec> getInputSpecList() {
@@ -70,7 +70,7 @@ public class TezEngineTaskContext extend
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    Text.writeString(out, taskModuleClassName);
+    Text.writeString(out, processorName);
     out.writeInt(inputSpecList.size());
     for (InputSpec inputSpec : inputSpecList) {
       inputSpec.write(out);
@@ -85,7 +85,7 @@ public class TezEngineTaskContext extend
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     
-    taskModuleClassName = Text.readString(in);
+    processorName = Text.readString(in);
     int numInputSpecs = in.readInt();
     inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
     for (int i = 0; i < numInputSpecs; i++) {
@@ -105,7 +105,7 @@ public class TezEngineTaskContext extend
   @Override
   public String toString() {
     StringBuffer sb = new StringBuffer();
-    sb.append("taskModuleClassName=" + taskModuleClassName
+    sb.append("processorName=" + processorName
         + ", inputSpecListSize=" + inputSpecList.size()
         + ", outputSpecListSize=" + outputSpecList.size());
     sb.append(", inputSpecList=[");

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java?rev=1471799&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
(added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
Thu Apr 25 01:03:37 2013
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.runtime;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.OutputSpec;
+import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.task.RuntimeTask;
+
+public class RuntimeUtils {
+
+  private static final Log LOG = LogFactory.getLog(RuntimeUtils.class);
+
+  private static final Class<?>[] CONTEXT_ARRAY =
+      new Class[] { TezEngineTaskContext.class };
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE
=
+    new ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+  @SuppressWarnings("unchecked")
+  public static <T> T getNewInstance(Class<T> theClass,
+      TezEngineTaskContext context) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(CONTEXT_ARRAY);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance(context);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return result;
+  }
+
+  public static RuntimeTask createRuntimeTask(
+      TezEngineTaskContext taskContext) {
+    LOG.info("TaskContext"
+        + ", Processor: " + taskContext.getProcessorName()
+        + ", InputCount=" + taskContext.getInputSpecList().size()
+        + ", OutputCount=" + taskContext.getOutputSpecList().size());
+    RuntimeTask t = null;
+    try {
+      Class<?> processorClazz =
+          Class.forName(taskContext.getProcessorName());
+
+      Processor processor = (Processor) getNewInstance(
+          processorClazz, taskContext);
+
+      Input[] inputs;
+      Output[] outputs;
+      if (taskContext.getInputSpecList().isEmpty()) {
+        LOG.info("Initializing task with 0 inputs");
+        inputs = new Input[0];
+      } else {
+        int iSpecCount = taskContext.getInputSpecList().size();
+        inputs = new Input[iSpecCount];
+        for (int i = 0; i < iSpecCount; ++i) {
+          InputSpec inSpec = taskContext.getInputSpecList().get(i);
+          LOG.info("XXXX Using Input"
+              + ", index=" + i
+              + ", inputClass=" + inSpec.getInputClassName());
+          Class<?> inputClazz = Class.forName(inSpec.getInputClassName());
+          Input input = (Input) getNewInstance(inputClazz, taskContext);
+          inputs[i] = input;
+        }
+      }
+      if (taskContext.getOutputSpecList().isEmpty()) {
+        LOG.info("Initializing task with 0 outputs");
+        outputs = new Output[0];
+      } else {
+        int oSpecCount = taskContext.getOutputSpecList().size();
+        outputs = new Output[oSpecCount];
+        for (int i = 0; i < oSpecCount; ++i) {
+          OutputSpec outSpec = taskContext.getOutputSpecList().get(i);
+          LOG.info("XXXX Using Output"
+              + ", index=" + i
+              + ", output=" + outSpec.getOutputClassName());
+          Class<?> outputClazz = Class.forName(outSpec.getOutputClassName());
+          Output output = (Output) getNewInstance(outputClazz, taskContext);
+          outputs[i] = output;
+        }
+      }
+      t = new RuntimeTask(processor, inputs, outputs);
+    } catch (ClassNotFoundException e) {
+      throw new YarnException("Unable to initialize RuntimeTask, context="
+          + taskContext, e);
+    }
+    return t;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java?rev=1471799&r1=1471798&r2=1471799&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
Thu Apr 25 01:03:37 2013
@@ -26,11 +26,7 @@ import org.apache.tez.engine.api.Output;
 import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.api.Task;
 
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-public class RuntimeTask 
-implements Task {
+public class RuntimeTask implements Task {
 
   private final Input[] inputs;
   private final Output[] outputs;
@@ -39,16 +35,6 @@ implements Task {
   private Configuration conf;
   private Master master;
   
-  @Inject
-  public RuntimeTask(
-      @Assisted Processor processor,
-      @Assisted Input in,
-      @Assisted Output out) {
-    this(processor,
-        new Input[] {in},
-        new Output[] {out});
-  }
-
   public RuntimeTask(
       Processor processor,
       Input[] inputs,

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1471799&r1=1471798&r2=1471799&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
Thu Apr 25 01:03:37 2013
@@ -63,29 +63,35 @@ import org.apache.hadoop.util.Reflection
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezTaskStatus;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
 import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 import org.apache.tez.engine.lib.input.LocalMergedInput;
+import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
 import org.apache.tez.engine.records.OutputContext;
 import org.apache.tez.engine.records.TezTaskAttemptID;
 import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
-import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.RuntimeUtils;
+import org.apache.tez.engine.task.RuntimeTask;
 import org.apache.tez.mapreduce.hadoop.ContainerContext;
 import org.apache.tez.mapreduce.hadoop.IDConverter;
 import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
 import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
 import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
-import org.apache.tez.mapreduce.task.InitialTask;
-import org.apache.tez.mapreduce.task.LocalFinalTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
 
 /** Implements MapReduce locally, in-process, for debugging. */
 @InterfaceAudience.Private
@@ -241,11 +247,15 @@ public class LocalJobRunner implements C
           TezTaskAttemptID tezMapId =
               IDConverter.fromMRTaskAttemptId(mapId);
           mapIds.add(mapId);
+          // FIXME invalid task context
           TezEngineTaskContext taskContext = 
               new TezEngineTaskContext(
                   tezMapId, user, localConf.getJobName(), "TODO_vertexName",
-                  InitialTask.class.getName(), null, null);
-          Injector injector = Guice.createInjector(new InitialTask());
+                  MapProcessor.class.getName(),
+                  Collections.singletonList(new InputSpec("srcVertex", 0,
+                      SimpleInput.class.getName())),
+                  Collections.singletonList(new OutputSpec("tgtVertex", 0,
+                      LocalOnFileSorterOutput.class.getName())));
 
           TezTaskOutput mapOutput = new TezLocalTaskOutputFiles();
           mapOutput.setConf(localConf);
@@ -254,8 +264,7 @@ public class LocalJobRunner implements C
           try {
             map_tasks.getAndIncrement();
             myMetrics.launchMap(mapId);
-            TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
-            Task t = factory.createTask(taskContext);
+            Task t = RuntimeUtils.createRuntimeTask(taskContext);
             t.initialize(localConf, Job.this);
             t.run();
             myMetrics.completeMap(mapId);
@@ -442,15 +451,16 @@ public class LocalJobRunner implements C
             localConf.set("mapreduce.jobtracker.address", "local");
             localConf.setInt(TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, mapIds.size());
             setupChildMapredLocalDirs(reduceId, user, localConf);
-            
-            
-            
+
+            // FIXME invalid task context
             TezEngineTaskContext taskContext = new TezEngineTaskContext(
                 IDConverter.fromMRTaskAttemptId(reduceId), user,
-                localConf.getJobName(), "TODO_vertexName", LocalFinalTask.class.getName(),
+                localConf.getJobName(), "TODO_vertexName",
+                ReduceProcessor.class.getName(),
                 Collections.singletonList(new InputSpec("TODO_srcVertexName",
-                    mapIds.size(), LocalMergedInput.class.getName())), null);
-            Injector injector = Guice.createInjector(new LocalFinalTask());
+                    mapIds.size(), LocalMergedInput.class.getName())),
+                Collections.singletonList(new OutputSpec("TODO_targetVertex",
+                    0, SimpleOutput.class.getName())));
 
             // move map output to reduce input  
             for (int i = 0; i < mapIds.size(); i++) {
@@ -480,8 +490,7 @@ public class LocalJobRunner implements C
             if (!this.isInterrupted()) {
               reduce_tasks += 1;
               myMetrics.launchReduce(reduceId);
-              TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
-              Task t = factory.createTask(taskContext);
+              Task t = RuntimeUtils.createRuntimeTask(taskContext);
               t.initialize(localConf, Job.this);
               t.run();
               myMetrics.completeReduce(reduceId);

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java?rev=1471799&r1=1471798&r2=1471799&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
Thu Apr 25 01:03:37 2013
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.doRetu
 import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -38,17 +39,20 @@ import org.apache.hadoop.mapred.InputSpl
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.api.Task;
-import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.runtime.RuntimeUtils;
+import org.apache.tez.engine.task.RuntimeTask;
 import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
 import org.apache.tez.mapreduce.input.SimpleInput;
-import org.apache.tez.mapreduce.task.InitialTaskWithLocalSort;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
 
 public class MapUtils {
 
@@ -91,20 +95,21 @@ public class MapUtils {
 
   public static Task runMapProcessor(FileSystem fs, Path workDir,
       JobConf jobConf,
-      int mapId, Path mapInput, AbstractModule taskModule,
-      TezTaskUmbilicalProtocol umbilical) 
-      throws Exception {
+      int mapId, Path mapInput,
+      TezTaskUmbilicalProtocol umbilical,
+      Class<?> outputClazz) throws Exception {
     jobConf.setInputFormat(SequenceFileInputFormat.class);
     InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput);
     TezEngineTaskContext taskContext = 
         new TezEngineTaskContext(
         TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0), "tez",
-        "tez", "TODO_vertexName", InitialTaskWithLocalSort.class.getName(),
-        null, null);
-  
-    Injector injector = Guice.createInjector(taskModule);
-    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
-    Task t = factory.createTask(taskContext);
+        "tez", "TODO_vertexName", MapProcessor.class.getName(),
+        Collections.singletonList(new InputSpec("srcVertex", 0,
+            SimpleInput.class.getName())),
+        Collections.singletonList(new OutputSpec("targetVertex", 0,
+            outputClazz.getName())));
+
+    Task t = RuntimeUtils.createRuntimeTask(taskContext);
     t.initialize(jobConf, umbilical);
     SimpleInput[] real = ((SimpleInput[])t.getInputs());
     SimpleInput[] inputs = spy(real);

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java?rev=1471799&r1=1471798&r2=1471799&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
Thu Apr 25 01:03:37 2013
@@ -35,10 +35,9 @@ import org.apache.tez.engine.common.sort
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 import org.apache.tez.engine.lib.output.InMemorySortedOutput;
+import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
 import org.apache.tez.mapreduce.TestUmbilicalProtocol;
 import org.apache.tez.mapreduce.processor.MapUtils;
-import org.apache.tez.mapreduce.task.InitialTaskWithInMemSort;
-import org.apache.tez.mapreduce.task.InitialTaskWithLocalSort;
 import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.TruncatedChannelBuffer;
@@ -88,7 +87,8 @@ public class TestMapProcessor {
     localFs.delete(workDir, true);
     MapUtils.runMapProcessor(
         localFs, workDir, job, 0, new Path(workDir, "map0"), 
-        new InitialTaskWithLocalSort(), new TestUmbilicalProtocol()).close();
+        new TestUmbilicalProtocol(),
+        LocalOnFileSorterOutput.class).close();
 
     Path mapOutputFile = mapOutputs.getInputFile(0);
     LOG.info("mapOutputFile = " + mapOutputFile);
@@ -123,7 +123,8 @@ public class TestMapProcessor {
     Task t =
         MapUtils.runMapProcessor(
             localFs, workDir, job, 0, new Path(workDir, "map0"), 
-        new InitialTaskWithInMemSort(), new TestUmbilicalProtocol(true));
+            new TestUmbilicalProtocol(true),
+            InMemorySortedOutput.class);
     InMemorySortedOutput[] outputs = (InMemorySortedOutput[])t.getOutputs();
     
     verifyInMemSortedStream(outputs[0], 0, 4096);

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java?rev=1471799&r1=1471798&r2=1471799&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
Thu Apr 25 01:03:37 2013
@@ -29,24 +29,26 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
 import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 import org.apache.tez.engine.lib.input.LocalMergedInput;
-import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.runtime.RuntimeUtils;
+import org.apache.tez.engine.task.RuntimeTask;
 import org.apache.tez.mapreduce.TestUmbilicalProtocol;
 import org.apache.tez.mapreduce.TezTestUtils;
+import org.apache.tez.mapreduce.output.SimpleOutput;
 import org.apache.tez.mapreduce.processor.MapUtils;
-import org.apache.tez.mapreduce.task.InitialTaskWithLocalSort;
-import org.apache.tez.mapreduce.task.LocalFinalTask;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import com.google.inject.Guice;
-import com.google.inject.Injector;
 
 public class TestReduceProcessor {
   
@@ -87,8 +89,8 @@ public class TestReduceProcessor {
     // Run a map
     MapUtils.runMapProcessor(
         localFs, workDir, job, 0, new Path(workDir, "map0"), 
-        new InitialTaskWithLocalSort(), new TestUmbilicalProtocol()
-        );
+        new TestUmbilicalProtocol(),
+        LocalOnFileSorterOutput.class);
 
     LOG.info("Starting reduce...");
     FileOutputFormat.setOutputPath(job, new Path(workDir, "output"));
@@ -96,13 +98,15 @@ public class TestReduceProcessor {
     // Now run a reduce
     TezEngineTaskContext taskContext = new TezEngineTaskContext(
         TezTestUtils.getMockTaskAttemptId(0, 0, 0, 0), "tez",
-        "tez", "TODO_vertexName", LocalFinalTask.class.getName(),
+        "tez", "TODO_vertexName", ReduceProcessor.class.getName(),
         Collections.singletonList(new InputSpec("TODO_srcVertexName", 1,
-            LocalMergedInput.class.getName())), null);
-    job.set(JobContext.TASK_ATTEMPT_ID, taskContext.getTaskAttemptId().toString());
-    Injector injector = Guice.createInjector(new LocalFinalTask());
-    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
-    Task t = factory.createTask(taskContext);
+            LocalMergedInput.class.getName())),
+        Collections.singletonList(new OutputSpec("TODO_targetVertexName", 1,
+                SimpleOutput.class.getName())));
+            
+    job.set(JobContext.TASK_ATTEMPT_ID,
+        taskContext.getTaskAttemptId().toString());
+    Task t = RuntimeUtils.createRuntimeTask(taskContext);
     t.initialize(job, new TestUmbilicalProtocol());
     t.run();
     t.close();

Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1471799&r1=1471798&r2=1471799&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
Thu Apr 25 01:03:37 2013
@@ -514,7 +514,7 @@ public class YARNRunner implements Clien
     // Intermediate vertices start at 1.
     Vertex vertex = new Vertex(
         MultiStageMRConfigUtil.getIntermediateStageVertexName(stageNum),
-        "org.apache.tez.mapreduce.task.IntermediateTask", numTasks);
+        "org.apache.tez.mapreduce.processor.reduce.ReduceProcessor", numTasks);
     
     Map<String, String> reduceEnv = new HashMap<String, String>();
     setupMapReduceEnv(conf, reduceEnv, false);
@@ -564,8 +564,6 @@ public class YARNRunner implements Clien
     int intermediateReduces = jobConf.getInt(
         MRJobConfig.MRR_INTERMEDIATE_STAGES, 0);
 
-    boolean mapOnly =
-        (numMaps > 0 && numReduces == 0 && intermediateReduces == 0);
     boolean isMRR = (intermediateReduces > 0);
 
     LOG.info("XXXX Parsing job config"
@@ -574,10 +572,10 @@ public class YARNRunner implements Clien
         + ", intermediateReduces=" + intermediateReduces);
 
     // configure map vertex
-    String mapProcessor = mapOnly ?
-        "org.apache.tez.mapreduce.task.MapOnlyTask"
-        : "org.apache.tez.mapreduce.task.InitialTask";
-    Vertex mapVertex = new Vertex(MultiStageMRConfigUtil.getInitialMapVertexName(), mapProcessor,
numMaps);
+    String mapProcessor = "org.apache.tez.mapreduce.processor.map.MapProcessor";
+    Vertex mapVertex = new Vertex(
+        MultiStageMRConfigUtil.getInitialMapVertexName(),
+        mapProcessor, numMaps);
 
     // FIXME set up map environment
     Map<String, String> mapEnv = new HashMap<String, String>();
@@ -617,8 +615,10 @@ public class YARNRunner implements Clien
     // configure final reduce vertex
     if (numReduces > 0) {
       String reduceProcessor =
-          "org.apache.tez.mapreduce.task.FinalTask";
-      Vertex reduceVertex = new Vertex(MultiStageMRConfigUtil.getFinalReduceVertexName(),
reduceProcessor, numReduces);
+          "org.apache.tez.mapreduce.processor.reduce.ReduceProcessor";
+      Vertex reduceVertex = new Vertex(
+          MultiStageMRConfigUtil.getFinalReduceVertexName(),
+          reduceProcessor, numReduces);
 
       // FIXME set up reduce environment
       Map<String, String> reduceEnv = new HashMap<String, String>();



Mime
View raw message