tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs (jeagles)
Date Thu, 14 Apr 2016 20:44:41 GMT
Repository: tez
Updated Branches:
  refs/heads/master a99867786 -> 4675a651f


  TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization
relative to Inputs/Outputs (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4675a651
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4675a651
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4675a651

Branch: refs/heads/master
Commit: 4675a651f874c223cb5968ec39704bc9ba973507
Parents: a998677
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Thu Apr 14 15:43:53 2016 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Thu Apr 14 15:44:05 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../apache/tez/dag/api/TezConfiguration.java    | 22 ++++++++++++++++++++
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 21 ++++++++++++++++---
 3 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4675a651/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 46ad9b6..2ed4091 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
   TEZ-3202. Reduce the memory need for jobs with high number of segments
+  TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization
relative to Inputs/Outputs
 
 Release 0.8.3: 2016-04-14 
 
@@ -434,6 +435,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES:
+  TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization
relative to Inputs/Outputs
   TEZ-3202. Reduce the memory need for jobs with high number of segments
   TEZ-3188. Move tez.submit.hosts out of TezConfiguration to TezConfigurationConstants.
   TEZ-3196. java.lang.InternalError from decompression codec is fatal to a task during shuffle

http://git-wip-us.apache.org/repos/asf/tez/blob/4675a651/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 0bbe1df..6785405 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -731,6 +731,28 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT = 10000;
 
   /**
+   * Boolean value. Backwards compatibility setting for initializing IO processor before
+   * inputs and outputs.
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="boolean")
+  public static final String TEZ_TASK_INITIALIZE_PROCESSOR_FIRST = TEZ_TASK_PREFIX +
+      "initialize-processor-first";
+  public static final boolean TEZ_TASK_INITIALIZE_PROCESSOR_FIRST_DEFAULT = false;
+
+  /**
+   * Boolean value. Backwards compatibility setting for initializing inputs and outputs
+   * serially instead of the parallel default.
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="boolean")
+  public static final String TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY = TEZ_TASK_PREFIX
+
+      "initialize-processor-io-serially";
+  public static final boolean TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY_DEFAULT = false;
+
+  /**
    * Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output

    * components need to make successive progress notifications. If the progress is not notified

    * for this interval then the task will be considered hung and terminated.

http://git-wip-us.apache.org/repos/asf/tez/blob/4675a651/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index a31136b..0863e65 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -156,6 +156,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private final HadoopShim hadoopShim;
   private final int maxEventBacklog;
 
+  private final boolean initializeProcessorFirst;
+  private final boolean initializeProcessorIOSerially;
+
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
       Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
       Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap,
@@ -189,8 +192,15 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();
     this.state.set(State.NEW);
     this.appAttemptNumber = appAttemptNumber;
+    this.initializeProcessorFirst = tezConf.getBoolean(TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_FIRST,
+        TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_FIRST_DEFAULT);
+    this.initializeProcessorIOSerially = tezConf.getBoolean(TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY,
+        TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY_DEFAULT);
     int numInitializers = numInputs + numOutputs; // Processor is initialized in the main
thread.
     numInitializers = (numInitializers == 0 ? 1 : numInitializers);
+    if (initializeProcessorIOSerially) {
+      numInitializers = 1;
+    }
     this.initializerExecutor = Executors.newFixedThreadPool(
         numInitializers,
         new ThreadFactoryBuilder().setDaemon(true)
@@ -219,6 +229,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.processorContext = createProcessorContext();
     this.processor = createProcessor(processorDescriptor.getClassName(), processorContext);
 
+    if (initializeProcessorFirst || initializeProcessorIOSerially) {
+      // Initialize processor in the current thread.
+      initializeLogicalIOProcessor();
+    }
     int numTasks = 0;
 
     int inputIndex = 0;
@@ -235,9 +249,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       numTasks++;
     }
 
-    // Initialize processor in the current thread.
-    initializeLogicalIOProcessor();
-
+    if (!(initializeProcessorFirst || initializeProcessorIOSerially)) {
+      // Initialize processor in the current thread.
+      initializeLogicalIOProcessor();
+    }
     int completedTasks = 0;
     while (completedTasks < numTasks) {
       LOG.info("Waiting for " + (numTasks-completedTasks) + " initializers to finish");


Mime
View raw message