Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8F76710762 for ; Tue, 15 Oct 2013 23:10:43 +0000 (UTC) Received: (qmail 94545 invoked by uid 500); 15 Oct 2013 23:10:43 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 94526 invoked by uid 500); 15 Oct 2013 23:10:43 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 94519 invoked by uid 99); 15 Oct 2013 23:10:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Oct 2013 23:10:43 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 15 Oct 2013 23:10:40 +0000 Received: (qmail 94207 invoked by uid 99); 15 Oct 2013 23:10:18 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Oct 2013 23:10:18 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 74F1F8B5691; Tue, 15 Oct 2013 23:10:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Message-Id: <22beece1215f497c837a6ff940d363c9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-486. Parallelize Input/Output/Processor initialization post TEZ-396 changes. (sseth) Date: Tue, 15 Oct 2013 23:10:18 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master 59539dde2 -> 9542bad46 TEZ-486. Parallelize Input/Output/Processor initialization post TEZ-396 changes. (sseth) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/9542bad4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/9542bad4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/9542bad4 Branch: refs/heads/master Commit: 9542bad469ed178185f7e5572676e3879b8186bd Parents: 59539dd Author: Siddharth Seth Authored: Tue Oct 15 16:09:55 2013 -0700 Committer: Siddharth Seth Committed: Tue Oct 15 16:09:55 2013 -0700 ---------------------------------------------------------------------- .../dag/app/dag/RootInputInitializerRunner.java | 2 +- .../processor/map/TestMapProcessor.java | 2 +- .../runtime/LogicalIOProcessorRuntimeTask.java | 293 +++++++++++-------- 3 files changed, 178 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9542bad4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java index ab78d1b..62149aa 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java @@ -63,7 +63,7 @@ public class RootInputInitializerRunner { this.vertexID = vertexID; this.eventHandler = eventHandler; this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("InputInitializer #%d").build()); + .setDaemon(true).setNameFormat("InputInitializer [" + this.vertexName + "] #%d").build()); this.executor = MoreExecutors.listeningDecorator(rawExecutor); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9542bad4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java index 2593318..d3204fe 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java @@ -137,7 +137,7 @@ public class TestMapProcessor { task.run(); task.close(); - TezInputContext inputContext = task.getInputContexts().get(0); + TezInputContext inputContext = task.getInputContexts().iterator().next(); TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, inputContext.getUniqueIdentifier()); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9542bad4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index f9852a1..0cfad93 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -26,6 +26,14 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; @@ -59,6 +67,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; @Private public class LogicalIOProcessorRuntimeTask extends RuntimeTask { @@ -66,23 +75,27 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private static final Log LOG = LogFactory .getLog(LogicalIOProcessorRuntimeTask.class); + /** Responsible for maintaining order of Inputs */ private final List inputSpecs; - private final List inputs; - + private final ConcurrentHashMap inputsMap; + private final ConcurrentHashMap inputContextMap; + /** Responsible for maintaining order of Outputs */ private final List outputSpecs; - private final List outputs; - - private List inputContexts; - private List outputContexts; - private TezProcessorContext processorContext; + private final ConcurrentHashMap outputsMap; + private final ConcurrentHashMap outputContextMap; private final ProcessorDescriptor processorDescriptor; private final LogicalIOProcessor processor; + private TezProcessorContext processorContext; + /** Maps which will be provided to the processor run method */ + private final LinkedHashMap runInputMap; + private final LinkedHashMap runOutputMap; + private final Map serviceConsumerMetadata; - - private Map inputMap; - private Map outputMap; + + private final ExecutorService initializerExecutor; + private final CompletionService initializerCompletionService; private LinkedBlockingQueue eventsToBeProcessed; private Thread eventRouterThread = null; @@ -96,46 +109,85 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { super(taskSpec, tezConf, tezUmbilical); LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: " + taskSpec); - this.inputContexts = new ArrayList(taskSpec.getInputs().size()); - this.outputContexts = new ArrayList(taskSpec.getOutputs().size()); + int numInputs = taskSpec.getInputs().size(); + int numOutputs = taskSpec.getOutputs().size(); this.inputSpecs = taskSpec.getInputs(); - this.inputs = createInputs(inputSpecs); + this.inputsMap = new ConcurrentHashMap(numInputs); + this.inputContextMap = new ConcurrentHashMap(numInputs); this.outputSpecs = taskSpec.getOutputs(); - this.outputs = createOutputs(outputSpecs); + this.outputsMap = new ConcurrentHashMap(numOutputs); + this.outputContextMap = new ConcurrentHashMap(numOutputs); + + this.runInputMap = new LinkedHashMap(); + this.runOutputMap = new LinkedHashMap(); + this.processorDescriptor = taskSpec.getProcessorDescriptor(); this.processor = createProcessor(processorDescriptor); this.serviceConsumerMetadata = serviceConsumerMetadata; this.eventsToBeProcessed = new LinkedBlockingQueue(); this.state = State.NEW; this.appAttemptNumber = appAttemptNumber; + int numInitializers = numInputs + numOutputs; + this.initializerExecutor = Executors.newFixedThreadPool( + numInitializers, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Initializer %d").build()); + this.initializerCompletionService = new ExecutorCompletionService( + this.initializerExecutor); } public void initialize() throws Exception { LOG.info("Initializing LogicalProcessorIORuntimeTask"); Preconditions.checkState(this.state == State.NEW, "Already initialized"); this.state = State.INITED; - inputMap = new LinkedHashMap(inputs.size()); - outputMap = new LinkedHashMap(outputs.size()); - - // TODO Maybe close initialized inputs / outputs in case of failure to - // initialize. - // Initialize all inputs. TODO: Multi-threaded at some point. - for (int i = 0; i < inputs.size(); i++) { - String srcVertexName = inputSpecs.get(i).getSourceVertexName(); - initializeInput(inputs.get(i), - inputSpecs.get(i)); - inputMap.put(srcVertexName, inputs.get(i)); + + int numTasks = 0; + + for (InputSpec inputSpec : taskSpec.getInputs()) { + this.initializerCompletionService.submit(new InitializeInputCallable(inputSpec)); + numTasks++; } - - // Initialize all outputs. TODO: Multi-threaded at some point. - for (int i = 0; i < outputs.size(); i++) { - String destVertexName = outputSpecs.get(i).getDestinationVertexName(); - initializeOutput(outputs.get(i), outputSpecs.get(i)); - outputMap.put(destVertexName, outputs.get(i)); + + for (OutputSpec outputSpec : taskSpec.getOutputs()) { + this.initializerCompletionService.submit(new InitializeOutputCallable(outputSpec)); + numTasks++; } - - // Initialize processor. + // Shutdown after all tasks complete. + this.initializerExecutor.shutdown(); + + // Initialize processor in the current thread. initializeLogicalIOProcessor(); + + int completedTasks = 0; + while (completedTasks < numTasks) { + LOG.info("Waiting for " + (numTasks-completedTasks) + " initializers to finish"); + Future future = initializerCompletionService.take(); + try { + future.get(); + completedTasks++; + } catch (ExecutionException e) { + if (e.getCause() instanceof Exception) { + throw (Exception) e.getCause(); + } else { + throw new Exception(e); + } + } + } + LOG.info("All initializers finished"); + + // Construct Inputs/Outputs map argument for processor.run() + for (InputSpec inputSpec : inputSpecs) { + LogicalInput input = inputsMap.get(inputSpec.getSourceVertexName()); + runInputMap.put(inputSpec.getSourceVertexName(), input); + } + for (OutputSpec outputSpec : outputSpecs) { + LogicalOutput output = outputsMap.get(outputSpec.getDestinationVertexName()); + runOutputMap.put(outputSpec.getDestinationVertexName(), output); + } + + // TODO Maybe close initialized inputs / outputs in case of failure to + // initialize. + startRouterThread(); } @@ -146,7 +198,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.state = State.RUNNING; } LogicalIOProcessor lioProcessor = (LogicalIOProcessor) processor; - lioProcessor.run(inputMap, outputMap); + lioProcessor.run(runInputMap, runOutputMap); } public void close() throws Exception { @@ -156,9 +208,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.state = State.CLOSED; // Close the Inputs. - for (int i = 0; i < inputs.size(); i++) { - String srcVertexName = inputSpecs.get(i).getSourceVertexName(); - List closeInputEvents = inputs.get(i).close(); + for (InputSpec inputSpec : inputSpecs) { + String srcVertexName = inputSpec.getSourceVertexName(); + List closeInputEvents = inputsMap.get(srcVertexName).close(); sendTaskGeneratedEvents(closeInputEvents, EventProducerConsumerType.INPUT, taskSpec.getVertexName(), srcVertexName, taskSpec.getTaskAttemptID()); @@ -168,9 +220,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { processor.close(); // Close the Outputs. - for (int i = 0; i < outputs.size(); i++) { - String destVertexName = outputSpecs.get(i).getDestinationVertexName(); - List closeOutputEvents = outputs.get(i).close(); + for (OutputSpec outputSpec : outputSpecs) { + String destVertexName = outputSpec.getDestinationVertexName(); + List closeOutputEvents = outputsMap.get(destVertexName).close(); sendTaskGeneratedEvents(closeOutputEvents, EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), destVertexName, taskSpec.getTaskAttemptID()); @@ -183,43 +235,76 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } } - private void initializeInput(Input input, InputSpec inputSpec) - throws Exception { - TezInputContext tezInputContext = createInputContext(inputSpec); - inputContexts.add(tezInputContext); - if (input instanceof LogicalInput) { - ((LogicalInput) input).setNumPhysicalInputs(inputSpec - .getPhysicalEdgeCount()); + private class InitializeInputCallable implements Callable { + + private final InputSpec inputSpec; + + public InitializeInputCallable(InputSpec inputSpec) { + this.inputSpec = inputSpec; + } + + @Override + public Void call() throws Exception { + LOG.info("Initializing Input using InputSpec: " + inputSpec); + String edgeName = inputSpec.getSourceVertexName(); + LogicalInput input = createInput(inputSpec); + TezInputContext inputContext = createInputContext(inputSpec); + inputsMap.put(edgeName, input); + inputContextMap.put(edgeName, inputContext); + + if (input instanceof LogicalInput) { + ((LogicalInput) input).setNumPhysicalInputs(inputSpec + .getPhysicalEdgeCount()); + } + LOG.info("Initializing Input with src edge: " + edgeName); + List events = input.initialize(inputContext); + sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT, + inputContext.getTaskVertexName(), inputContext.getSourceVertexName(), + taskSpec.getTaskAttemptID()); + LOG.info("Initialized Input with src edge: " + edgeName); + return null; } - LOG.info("Initializing Input using InputSpec: " + inputSpec); - List events = input.initialize(tezInputContext); - sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT, - tezInputContext.getTaskVertexName(), - tezInputContext.getSourceVertexName(), taskSpec.getTaskAttemptID()); } - private void initializeOutput(Output output, OutputSpec outputSpec) - throws Exception { - TezOutputContext tezOutputContext = createOutputContext(outputSpec); - outputContexts.add(tezOutputContext); - if (output instanceof LogicalOutput) { - ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec - .getPhysicalEdgeCount()); + private class InitializeOutputCallable implements Callable { + + private final OutputSpec outputSpec; + + public InitializeOutputCallable(OutputSpec outputSpec) { + this.outputSpec = outputSpec; + } + + @Override + public Void call() throws Exception { + LOG.info("Initializing Output using OutputSpec: " + outputSpec); + String edgeName = outputSpec.getDestinationVertexName(); + LogicalOutput output = createOutput(outputSpec); + TezOutputContext outputContext = createOutputContext(outputSpec); + outputsMap.put(edgeName, output); + outputContextMap.put(edgeName, outputContext); + + if (output instanceof LogicalOutput) { + ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec + .getPhysicalEdgeCount()); + } + LOG.info("Initializing Input with dest edge: " + edgeName); + List events = output.initialize(outputContext); + sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT, + outputContext.getTaskVertexName(), + outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID()); + LOG.info("Initialized Output with dest edge: " + edgeName); + return null; } - LOG.info("Initializing Output using OutputSpec: " + outputSpec); - List events = output.initialize(tezOutputContext); - sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT, - tezOutputContext.getTaskVertexName(), - tezOutputContext.getDestinationVertexName(), - taskSpec.getTaskAttemptID()); } private void initializeLogicalIOProcessor() throws Exception { - LOG.info("Initializing processor" - + ", processorClassName=" + processorDescriptor.getClassName()); + LOG.info("Initializing processor" + ", processorClassName=" + + processorDescriptor.getClassName()); TezProcessorContext processorContext = createProcessorContext(); this.processorContext = processorContext; processor.initialize(processorContext); + LOG.info("Initialized processor" + ", processorClassName=" + + processorDescriptor.getClassName()); } private TezInputContext createInputContext(InputSpec inputSpec) { @@ -254,42 +339,28 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { return processorContext; } - private List createInputs(List inputSpecs) { - List inputs = new ArrayList(inputSpecs.size()); - for (InputSpec inputSpec : inputSpecs) { - LOG.info("Creating Input from InputSpec: " - + inputSpec); - Input input = RuntimeUtils.createClazzInstance(inputSpec - .getInputDescriptor().getClassName()); - - if (input instanceof LogicalInput) { - inputs.add((LogicalInput) input); - } else { - throw new TezUncheckedException(input.getClass().getName() - + " is not a sub-type of LogicalInput." - + " Only LogicalInput sub-types supported by LogicalIOProcessor."); - } + private LogicalInput createInput(InputSpec inputSpec) { + LOG.info("Creating Input"); + Input input = RuntimeUtils.createClazzInstance(inputSpec + .getInputDescriptor().getClassName()); + if (!(input instanceof LogicalInput)) { + throw new TezUncheckedException(input.getClass().getName() + + " is not a sub-type of LogicalInput." + + " Only LogicalInput sub-types supported by LogicalIOProcessor."); } - return inputs; + return (LogicalInput)input; } - private List createOutputs(List outputSpecs) { - List outputs = new ArrayList( - outputSpecs.size()); - for (OutputSpec outputSpec : outputSpecs) { - LOG.info("Creating Output from OutputSpec" - + outputSpec); - Output output = RuntimeUtils.createClazzInstance(outputSpec - .getOutputDescriptor().getClassName()); - if (output instanceof LogicalOutput) { - outputs.add((LogicalOutput) output); - } else { - throw new TezUncheckedException(output.getClass().getName() - + " is not a sub-type of LogicalOutput." - + " Only LogicalOutput sub-types supported by LogicalIOProcessor."); - } + private LogicalOutput createOutput(OutputSpec outputSpec) { + LOG.info("Creating Output"); + Output output = RuntimeUtils.createClazzInstance(outputSpec + .getOutputDescriptor().getClassName()); + if (!(output instanceof LogicalOutput)) { + throw new TezUncheckedException(output.getClass().getName() + + " is not a sub-type of LogicalOutput." + + " Only LogicalOutput sub-types supported by LogicalIOProcessor."); } - return outputs; + return (LogicalOutput) output; } private LogicalIOProcessor createProcessor( @@ -338,7 +409,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { try { switch (e.getDestinationInfo().getEventGenerator()) { case INPUT: - LogicalInput input = inputMap.get( + LogicalInput input = inputsMap.get( e.getDestinationInfo().getEdgeVertexName()); if (input != null) { input.handleEvents(Collections.singletonList(e.getEvent())); @@ -348,7 +419,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } break; case OUTPUT: - LogicalOutput output = outputMap.get( + LogicalOutput output = outputsMap.get( e.getDestinationInfo().getEdgeVertexName()); if (output != null) { output.handleEvents(Collections.singletonList(e.getEvent())); @@ -432,14 +503,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { @Private @VisibleForTesting - public List getInputContexts() { - return this.inputContexts; + public Collection getInputContexts() { + return this.inputContextMap.values(); } @Private @VisibleForTesting - public List getOutputContexts() { - return this.outputContexts; + public Collection getOutputContexts() { + return this.outputContextMap.values(); } @Private @@ -450,18 +521,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { @Private @VisibleForTesting - public Map getInputs() { - return this.inputMap; - } - - @Private - @VisibleForTesting - public Map getOutputs() { - return this.outputMap; - } - - @Private - @VisibleForTesting public LogicalIOProcessor getProcessor() { return this.processor; }