tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-486. Parallelize Input/Output/Processor initialization post TEZ-396 changes. (sseth)
Date Tue, 15 Oct 2013 23:10:18 GMT
Updated Branches:
  refs/heads/master 59539dde2 -> 9542bad46


TEZ-486. Parallelize Input/Output/Processor initialization post TEZ-396
changes. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/9542bad4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/9542bad4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/9542bad4

Branch: refs/heads/master
Commit: 9542bad469ed178185f7e5572676e3879b8186bd
Parents: 59539dd
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Oct 15 16:09:55 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Oct 15 16:09:55 2013 -0700

----------------------------------------------------------------------
 .../dag/app/dag/RootInputInitializerRunner.java |   2 +-
 .../processor/map/TestMapProcessor.java         |   2 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 293 +++++++++++--------
 3 files changed, 178 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9542bad4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
index ab78d1b..62149aa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
@@ -63,7 +63,7 @@ public class RootInputInitializerRunner {
     this.vertexID = vertexID;
     this.eventHandler = eventHandler;
     this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-        .setDaemon(true).setNameFormat("InputInitializer #%d").build());
+        .setDaemon(true).setNameFormat("InputInitializer [" + this.vertexName + "] #%d").build());
     this.executor = MoreExecutors.listeningDecorator(rawExecutor);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9542bad4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 2593318..d3204fe 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -137,7 +137,7 @@ public class TestMapProcessor {
     task.run();
     task.close();
     
-    TezInputContext inputContext = task.getInputContexts().get(0);
+    TezInputContext inputContext = task.getInputContexts().iterator().next();
     TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, inputContext.getUniqueIdentifier());
     
     

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9542bad4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index f9852a1..0cfad93 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -26,6 +26,14 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -59,6 +67,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 @Private
 public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
@@ -66,23 +75,27 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private static final Log LOG = LogFactory
       .getLog(LogicalIOProcessorRuntimeTask.class);
 
+  /** Responsible for maintaining order of Inputs */
   private final List<InputSpec> inputSpecs;
-  private final List<LogicalInput> inputs;
-
+  private final ConcurrentHashMap<String, LogicalInput> inputsMap;
+  private final ConcurrentHashMap<String, TezInputContext> inputContextMap;
+  /** Responsible for maintaining order of Outputs */
   private final List<OutputSpec> outputSpecs;
-  private final List<LogicalOutput> outputs;
-
-  private List<TezInputContext> inputContexts;
-  private List<TezOutputContext> outputContexts;
-  private TezProcessorContext processorContext;
+  private final ConcurrentHashMap<String, LogicalOutput> outputsMap;
+  private final ConcurrentHashMap<String, TezOutputContext> outputContextMap;
   
   private final ProcessorDescriptor processorDescriptor;
   private final LogicalIOProcessor processor;
+  private TezProcessorContext processorContext;
 
+  /** Maps which will be provided to the processor run method */
+  private final LinkedHashMap<String, LogicalInput> runInputMap;
+  private final LinkedHashMap<String, LogicalOutput> runOutputMap;
+  
   private final Map<String, ByteBuffer> serviceConsumerMetadata;
-
-  private Map<String, LogicalInput> inputMap;
-  private Map<String, LogicalOutput> outputMap;
+  
+  private final ExecutorService initializerExecutor;
+  private final CompletionService<Void> initializerCompletionService;
 
   private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
   private Thread eventRouterThread = null;
@@ -96,46 +109,85 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     super(taskSpec, tezConf, tezUmbilical);
     LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
         + taskSpec);
-    this.inputContexts = new ArrayList<TezInputContext>(taskSpec.getInputs().size());
-    this.outputContexts = new ArrayList<TezOutputContext>(taskSpec.getOutputs().size());
+    int numInputs = taskSpec.getInputs().size();
+    int numOutputs = taskSpec.getOutputs().size();
     this.inputSpecs = taskSpec.getInputs();
-    this.inputs = createInputs(inputSpecs);
+    this.inputsMap = new ConcurrentHashMap<String, LogicalInput>(numInputs);
+    this.inputContextMap = new ConcurrentHashMap<String, TezInputContext>(numInputs);
     this.outputSpecs = taskSpec.getOutputs();
-    this.outputs = createOutputs(outputSpecs);
+    this.outputsMap = new ConcurrentHashMap<String, LogicalOutput>(numOutputs);
+    this.outputContextMap = new ConcurrentHashMap<String, TezOutputContext>(numOutputs);
+
+    this.runInputMap = new LinkedHashMap<String, LogicalInput>();
+    this.runOutputMap = new LinkedHashMap<String, LogicalOutput>();
+
     this.processorDescriptor = taskSpec.getProcessorDescriptor();
     this.processor = createProcessor(processorDescriptor);
     this.serviceConsumerMetadata = serviceConsumerMetadata;
     this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();
     this.state = State.NEW;
     this.appAttemptNumber = appAttemptNumber;
+    int numInitializers = numInputs + numOutputs;
+    this.initializerExecutor = Executors.newFixedThreadPool(
+        numInitializers,
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("Initializer %d").build());
+    this.initializerCompletionService = new ExecutorCompletionService<Void>(
+        this.initializerExecutor);
   }
 
   public void initialize() throws Exception {
     LOG.info("Initializing LogicalProcessorIORuntimeTask");
     Preconditions.checkState(this.state == State.NEW, "Already initialized");
     this.state = State.INITED;
-    inputMap = new LinkedHashMap<String, LogicalInput>(inputs.size());
-    outputMap = new LinkedHashMap<String, LogicalOutput>(outputs.size());
-
-    // TODO Maybe close initialized inputs / outputs in case of failure to
-    // initialize.
-    // Initialize all inputs. TODO: Multi-threaded at some point.
-    for (int i = 0; i < inputs.size(); i++) {
-      String srcVertexName = inputSpecs.get(i).getSourceVertexName();
-      initializeInput(inputs.get(i),
-          inputSpecs.get(i));
-      inputMap.put(srcVertexName, inputs.get(i));
+    
+    int numTasks = 0;
+    
+    for (InputSpec inputSpec : taskSpec.getInputs()) {
+      this.initializerCompletionService.submit(new InitializeInputCallable(inputSpec));
+      numTasks++;
     }
-
-    // Initialize all outputs. TODO: Multi-threaded at some point.
-    for (int i = 0; i < outputs.size(); i++) {
-      String destVertexName = outputSpecs.get(i).getDestinationVertexName();
-      initializeOutput(outputs.get(i), outputSpecs.get(i));
-      outputMap.put(destVertexName, outputs.get(i));
+    
+    for (OutputSpec outputSpec : taskSpec.getOutputs()) {
+      this.initializerCompletionService.submit(new InitializeOutputCallable(outputSpec));
+      numTasks++;
     }
-
-    // Initialize processor.
+    // Shutdown after all tasks complete.
+    this.initializerExecutor.shutdown();
+    
+    // Initialize processor in the current thread.
     initializeLogicalIOProcessor();
+    
+    int completedTasks = 0;
+    while (completedTasks < numTasks) {
+      LOG.info("Waiting for " + (numTasks-completedTasks) + " initializers to finish");
+      Future<Void> future = initializerCompletionService.take();
+      try {
+        future.get();
+        completedTasks++;
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof Exception) {
+          throw (Exception) e.getCause();
+        } else {
+          throw new Exception(e);
+        }
+      }
+    }
+    LOG.info("All initializers finished");
+
+    // Construct Inputs/Outputs map argument for processor.run()
+    for (InputSpec inputSpec : inputSpecs) {
+      LogicalInput input = inputsMap.get(inputSpec.getSourceVertexName());
+      runInputMap.put(inputSpec.getSourceVertexName(), input);
+    }
+    for (OutputSpec outputSpec : outputSpecs) {
+      LogicalOutput output = outputsMap.get(outputSpec.getDestinationVertexName());
+      runOutputMap.put(outputSpec.getDestinationVertexName(), output);
+    }
+    
+    // TODO Maybe close initialized inputs / outputs in case of failure to
+    // initialize.
+  
     startRouterThread();
   }
 
@@ -146,7 +198,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       this.state = State.RUNNING;
     }
     LogicalIOProcessor lioProcessor = (LogicalIOProcessor) processor;
-    lioProcessor.run(inputMap, outputMap);
+    lioProcessor.run(runInputMap, runOutputMap);
   }
 
   public void close() throws Exception {
@@ -156,9 +208,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       this.state = State.CLOSED;
 
       // Close the Inputs.
-      for (int i = 0; i < inputs.size(); i++) {
-        String srcVertexName = inputSpecs.get(i).getSourceVertexName();
-        List<Event> closeInputEvents = inputs.get(i).close();
+      for (InputSpec inputSpec : inputSpecs) {
+        String srcVertexName = inputSpec.getSourceVertexName();
+        List<Event> closeInputEvents = inputsMap.get(srcVertexName).close();
         sendTaskGeneratedEvents(closeInputEvents,
             EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
             srcVertexName, taskSpec.getTaskAttemptID());
@@ -168,9 +220,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       processor.close();
 
       // Close the Outputs.
-      for (int i = 0; i < outputs.size(); i++) {
-        String destVertexName = outputSpecs.get(i).getDestinationVertexName();
-        List<Event> closeOutputEvents = outputs.get(i).close();
+      for (OutputSpec outputSpec : outputSpecs) {
+        String destVertexName = outputSpec.getDestinationVertexName();
+        List<Event> closeOutputEvents = outputsMap.get(destVertexName).close();
         sendTaskGeneratedEvents(closeOutputEvents,
             EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
             destVertexName, taskSpec.getTaskAttemptID());
@@ -183,43 +235,76 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
   }
 
-  private void initializeInput(Input input, InputSpec inputSpec)
-      throws Exception {
-    TezInputContext tezInputContext = createInputContext(inputSpec);
-    inputContexts.add(tezInputContext);
-    if (input instanceof LogicalInput) {
-      ((LogicalInput) input).setNumPhysicalInputs(inputSpec
-          .getPhysicalEdgeCount());
+  private class InitializeInputCallable implements Callable<Void> {
+
+    private final InputSpec inputSpec;
+
+    public InitializeInputCallable(InputSpec inputSpec) {
+      this.inputSpec = inputSpec;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      LOG.info("Initializing Input using InputSpec: " + inputSpec);
+      String edgeName = inputSpec.getSourceVertexName();
+      LogicalInput input = createInput(inputSpec);
+      TezInputContext inputContext = createInputContext(inputSpec);
+      inputsMap.put(edgeName, input);
+      inputContextMap.put(edgeName, inputContext);
+
+      if (input instanceof LogicalInput) {
+        ((LogicalInput) input).setNumPhysicalInputs(inputSpec
+            .getPhysicalEdgeCount());
+      }
+      LOG.info("Initializing Input with src edge: " + edgeName);
+      List<Event> events = input.initialize(inputContext);
+      sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
+          inputContext.getTaskVertexName(), inputContext.getSourceVertexName(),
+          taskSpec.getTaskAttemptID());
+      LOG.info("Initialized Input with src edge: " + edgeName);
+      return null;
     }
-    LOG.info("Initializing Input using InputSpec: " + inputSpec);
-    List<Event> events = input.initialize(tezInputContext);
-    sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
-        tezInputContext.getTaskVertexName(),
-        tezInputContext.getSourceVertexName(), taskSpec.getTaskAttemptID());
   }
 
-  private void initializeOutput(Output output, OutputSpec outputSpec)
-      throws Exception {
-    TezOutputContext tezOutputContext = createOutputContext(outputSpec);
-    outputContexts.add(tezOutputContext);
-    if (output instanceof LogicalOutput) {
-      ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
-          .getPhysicalEdgeCount());
+  private class InitializeOutputCallable implements Callable<Void> {
+
+    private final OutputSpec outputSpec;
+
+    public InitializeOutputCallable(OutputSpec outputSpec) {
+      this.outputSpec = outputSpec;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      LOG.info("Initializing Output using OutputSpec: " + outputSpec);
+      String edgeName = outputSpec.getDestinationVertexName();
+      LogicalOutput output = createOutput(outputSpec);
+      TezOutputContext outputContext = createOutputContext(outputSpec);
+      outputsMap.put(edgeName, output);
+      outputContextMap.put(edgeName, outputContext);
+
+      if (output instanceof LogicalOutput) {
+        ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
+            .getPhysicalEdgeCount());
+      }
+      LOG.info("Initializing Input with dest edge: " + edgeName);
+      List<Event> events = output.initialize(outputContext);
+      sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
+          outputContext.getTaskVertexName(),
+          outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID());
+      LOG.info("Initialized Output with dest edge: " + edgeName);
+      return null;
     }
-    LOG.info("Initializing Output using OutputSpec: " + outputSpec);
-    List<Event> events = output.initialize(tezOutputContext);
-    sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
-        tezOutputContext.getTaskVertexName(),
-        tezOutputContext.getDestinationVertexName(),
-        taskSpec.getTaskAttemptID());
   }
 
   private void initializeLogicalIOProcessor() throws Exception {
-    LOG.info("Initializing processor"
-        + ", processorClassName=" + processorDescriptor.getClassName());
+    LOG.info("Initializing processor" + ", processorClassName="
+        + processorDescriptor.getClassName());
     TezProcessorContext processorContext = createProcessorContext();
     this.processorContext = processorContext;
     processor.initialize(processorContext);
+    LOG.info("Initialized processor" + ", processorClassName="
+        + processorDescriptor.getClassName());
   }
 
   private TezInputContext createInputContext(InputSpec inputSpec) {
@@ -254,42 +339,28 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     return processorContext;
   }
 
-  private List<LogicalInput> createInputs(List<InputSpec> inputSpecs) {
-    List<LogicalInput> inputs = new ArrayList<LogicalInput>(inputSpecs.size());
-    for (InputSpec inputSpec : inputSpecs) {
-      LOG.info("Creating Input from InputSpec: "
-          + inputSpec);
-      Input input = RuntimeUtils.createClazzInstance(inputSpec
-          .getInputDescriptor().getClassName());
-
-      if (input instanceof LogicalInput) {
-        inputs.add((LogicalInput) input);
-      } else {
-        throw new TezUncheckedException(input.getClass().getName()
-            + " is not a sub-type of LogicalInput."
-            + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
-      }
+  private LogicalInput createInput(InputSpec inputSpec) {
+    LOG.info("Creating Input");
+    Input input = RuntimeUtils.createClazzInstance(inputSpec
+        .getInputDescriptor().getClassName());
+    if (!(input instanceof LogicalInput)) {
+      throw new TezUncheckedException(input.getClass().getName()
+          + " is not a sub-type of LogicalInput."
+          + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
     }
-    return inputs;
+    return (LogicalInput)input;
   }
 
-  private List<LogicalOutput> createOutputs(List<OutputSpec> outputSpecs) {
-    List<LogicalOutput> outputs = new ArrayList<LogicalOutput>(
-        outputSpecs.size());
-    for (OutputSpec outputSpec : outputSpecs) {
-      LOG.info("Creating Output from OutputSpec"
-          + outputSpec);
-      Output output = RuntimeUtils.createClazzInstance(outputSpec
-          .getOutputDescriptor().getClassName());
-      if (output instanceof LogicalOutput) {
-        outputs.add((LogicalOutput) output);
-      } else {
-        throw new TezUncheckedException(output.getClass().getName()
-            + " is not a sub-type of LogicalOutput."
-            + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
-      }
+  private LogicalOutput createOutput(OutputSpec outputSpec) {
+    LOG.info("Creating Output");
+    Output output = RuntimeUtils.createClazzInstance(outputSpec
+        .getOutputDescriptor().getClassName());
+    if (!(output instanceof LogicalOutput)) {
+      throw new TezUncheckedException(output.getClass().getName()
+          + " is not a sub-type of LogicalOutput."
+          + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
     }
-    return outputs;
+    return (LogicalOutput) output;
   }
 
   private LogicalIOProcessor createProcessor(
@@ -338,7 +409,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     try {
       switch (e.getDestinationInfo().getEventGenerator()) {
       case INPUT:
-        LogicalInput input = inputMap.get(
+        LogicalInput input = inputsMap.get(
             e.getDestinationInfo().getEdgeVertexName());
         if (input != null) {
           input.handleEvents(Collections.singletonList(e.getEvent()));
@@ -348,7 +419,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         }
         break;
       case OUTPUT:
-        LogicalOutput output = outputMap.get(
+        LogicalOutput output = outputsMap.get(
             e.getDestinationInfo().getEdgeVertexName());
         if (output != null) {
           output.handleEvents(Collections.singletonList(e.getEvent()));
@@ -432,14 +503,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   
   @Private
   @VisibleForTesting
-  public List<TezInputContext> getInputContexts() {
-    return this.inputContexts;
+  public Collection<TezInputContext> getInputContexts() {
+    return this.inputContextMap.values();
   }
   
   @Private
   @VisibleForTesting
-  public List<TezOutputContext> getOutputContexts() {
-    return this.outputContexts;
+  public Collection<TezOutputContext> getOutputContexts() {
+    return this.outputContextMap.values();
   }
 
   @Private
@@ -450,18 +521,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   
   @Private
   @VisibleForTesting
-  public Map<String, LogicalInput> getInputs() {
-    return this.inputMap;
-  }
-  
-  @Private
-  @VisibleForTesting
-  public Map<String, LogicalOutput> getOutputs() {
-    return this.outputMap;
-  }
-  
-  @Private
-  @VisibleForTesting
   public LogicalIOProcessor getProcessor() {
     return this.processor;
   }


Mime
View raw message