tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [01/14] tez git commit: TEZ-2169. Add NDC context to various threads and pools. Contributed by Sergey Shelukhin.
Date Thu, 05 Mar 2015 02:20:14 GMT
Repository: tez
Updated Branches:
  refs/heads/TEZ-2003 ec80eab36 -> 45e5311d9 (forced update)


TEZ-2169. Add NDC context to various threads and pools. Contributed by
Sergey Shelukhin.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/09ffc24c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/09ffc24c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/09ffc24c

Branch: refs/heads/TEZ-2003
Commit: 09ffc24cfdd4396b1883b00de2ca97046ca7193d
Parents: 728dc66
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Mar 4 18:18:11 2015 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Mar 4 18:18:11 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/common/CallableWithNdc.java  | 43 ++++++++++++++++++++
 .../org/apache/tez/common/RunnableWithNdc.java  | 42 +++++++++++++++++++
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 18 ++++----
 .../tez/runtime/task/ContainerReporter.java     |  5 ++-
 .../apache/tez/runtime/task/TezTaskRunner.java  |  6 +--
 .../runtime/library/common/shuffle/Fetcher.java |  5 ++-
 .../common/shuffle/impl/ShuffleManager.java     |  5 ++-
 .../common/shuffle/orderedgrouped/Shuffle.java  |  5 ++-
 .../common/sort/impl/PipelinedSorter.java       |  6 ++-
 .../writers/UnorderedPartitionedKVWriter.java   | 11 +++--
 11 files changed, 123 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/09ffc24c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e19c4bf..3e9f959 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2169. Add NDC context to various threads and pools.
   TEZ-2171. Remove unused metrics code.
   TEZ-2001. Support pipelined data transfer for ordered output.
   TEZ-2170. Incorrect its in README.md.

http://git-wip-us.apache.org/repos/asf/tez/blob/09ffc24c/tez-common/src/main/java/org/apache/tez/common/CallableWithNdc.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/CallableWithNdc.java b/tez-common/src/main/java/org/apache/tez/common/CallableWithNdc.java
new file mode 100644
index 0000000..2d5e1dd
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/CallableWithNdc.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import java.util.Stack;
+import java.util.concurrent.Callable;
+
+import org.apache.log4j.NDC;
+
+public abstract class CallableWithNdc<T> implements Callable<T> {
+  private final Stack ndcStack;
+
+  public CallableWithNdc() {
+    ndcStack = NDC.cloneStack();
+  }
+
+  @Override
+  public final T call() throws Exception {
+    NDC.inherit(ndcStack);
+    try {
+      return callInternal();
+    } finally {
+      NDC.clear();
+    }
+  }
+
+  protected abstract T callInternal() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/09ffc24c/tez-common/src/main/java/org/apache/tez/common/RunnableWithNdc.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/RunnableWithNdc.java b/tez-common/src/main/java/org/apache/tez/common/RunnableWithNdc.java
new file mode 100644
index 0000000..4d718d4
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/RunnableWithNdc.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import java.util.Stack;
+
+import org.apache.log4j.NDC;
+
+public abstract class RunnableWithNdc implements Runnable {
+  private final Stack ndcStack;
+
+  public RunnableWithNdc() {
+    ndcStack = NDC.cloneStack();
+  }
+
+  @Override
+  public final void run() {
+    NDC.inherit(ndcStack);
+    try {
+      runInternal();
+    } finally {
+      NDC.clear();
+    }
+  }
+
+  protected abstract void runInternal();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/09ffc24c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 9d9be37..a0b48f0 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -42,7 +42,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.RunnableWithNdc;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -362,7 +364,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
   }
 
-  private class InitializeInputCallable implements Callable<Void> {
+  private class InitializeInputCallable extends CallableWithNdc<Void> {
 
     private final InputSpec inputSpec;
     private final int inputIndex;
@@ -373,7 +375,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
 
     @Override
-    public Void call() throws Exception {
+    protected Void callInternal() throws Exception {
       LOG.info("Initializing Input using InputSpec: " + inputSpec);
       String edgeName = inputSpec.getSourceVertexName();
       InputContext inputContext = createInputContext(inputsMap, inputSpec, inputIndex);
@@ -392,7 +394,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
   }
 
-  private static class StartInputCallable implements Callable<Void> {
+  private static class StartInputCallable extends CallableWithNdc<Void> {
     private final LogicalInput input;
     private final String srcVertexName;
 
@@ -402,7 +404,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
 
     @Override
-    public Void call() throws Exception {
+    protected Void callInternal() throws Exception {
       LOG.info("Starting Input with src edge: " + srcVertexName);
       input.start();
       LOG.info("Started Input with src edge: " + srcVertexName);
@@ -410,7 +412,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
   }
 
-  private class InitializeOutputCallable implements Callable<Void> {
+  private class InitializeOutputCallable extends CallableWithNdc<Void> {
 
     private final OutputSpec outputSpec;
     private final int outputIndex;
@@ -421,7 +423,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
 
     @Override
-    public Void call() throws Exception {
+    protected Void callInternal() throws Exception {
       LOG.info("Initializing Output using OutputSpec: " + outputSpec);
       String edgeName = outputSpec.getDestinationVertexName();
       OutputContext outputContext = createOutputContext(outputSpec, outputIndex);
@@ -662,8 +664,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   private void startRouterThread() {
-    eventRouterThread = new Thread(new Runnable() {
-      public void run() {
+    eventRouterThread = new Thread(new RunnableWithNdc() {
+      public void runInternal() {
         while (!isTaskDone() && !Thread.currentThread().isInterrupted()) {
           try {
             TezEvent e = eventsToBeProcessed.take();

http://git-wip-us.apache.org/repos/asf/tez/blob/09ffc24c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
index a68c7c1..c08d95e 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
+import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
@@ -31,7 +32,7 @@ import org.apache.tez.common.TezTaskUmbilicalProtocol;
  * functionality is to poll for new tasks.
  * 
  */
-public class ContainerReporter implements Callable<ContainerTask> {
+public class ContainerReporter extends CallableWithNdc<ContainerTask> {
 
   private static final Logger LOG = Logger.getLogger(ContainerReporter.class);
 
@@ -50,7 +51,7 @@ public class ContainerReporter implements Callable<ContainerTask>
{
   }
 
   @Override
-  public ContainerTask call() throws Exception {
+  protected ContainerTask callInternal() throws Exception {
     ContainerTask containerTask = null;
     LOG.info("Attempting to fetch new task");
     containerTask = umbilical.getTask(containerContext);

http://git-wip-us.apache.org/repos/asf/tez/blob/09ffc24c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index 6606481..e39be37 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
+import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -163,10 +164,9 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
     return true;
   }
 
-  private class TaskRunnerCallable implements Callable<Void> {
-
+  private class TaskRunnerCallable extends CallableWithNdc<Void> {
     @Override
-    public Void call() throws Exception {
+    protected Void callInternal() throws Exception {
       try {
         return ugi.doAs(new PrivilegedExceptionAction<Void>() {
           @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/09ffc24c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 68951d5..30dad46 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@@ -68,7 +69,7 @@ import com.google.common.base.Preconditions;
  * Responsible for fetching inputs served by the ShuffleHandler for a single
  * host. Construct using {@link FetcherBuilder}
  */
-public class Fetcher implements Callable<FetchResult> {
+public class Fetcher extends CallableWithNdc<FetchResult> {
 
   private static final Log LOG = LogFactory.getLog(Fetcher.class);
 
@@ -161,7 +162,7 @@ public class Fetcher implements Callable<FetchResult> {
   }
 
   @Override
-  public FetchResult call() throws Exception {
+  protected FetchResult callInternal() throws Exception {
     boolean multiplex = (this.sharedFetchEnabled && this.localDiskFetchEnabled);
 
     if (srcAttempts.size() == 0) {

http://git-wip-us.apache.org/repos/asf/tez/blob/09ffc24c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 3dc8156..fc42e3d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
@@ -255,7 +256,7 @@ public class ShuffleManager implements FetcherCallback {
     schedulerExecutor.shutdown();
   }
   
-  private class RunShuffleCallable implements Callable<Void> {
+  private class RunShuffleCallable extends CallableWithNdc<Void> {
 
     private final Configuration conf;
 
@@ -264,7 +265,7 @@ public class ShuffleManager implements FetcherCallback {
     }
 
     @Override
-    public Void call() throws Exception {
+    protected Void callInternal() throws Exception {
       while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
         lock.lock();
         try {

http://git-wip-us.apache.org/repos/asf/tez/blob/09ffc24c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 138ecb9..f7b45a7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
@@ -325,9 +326,9 @@ public class Shuffle implements ExceptionReporter {
   }
 
   // Not handling any shutdown logic here. That's handled by the callback from this invocation.
-  private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
+  private class RunShuffleCallable extends CallableWithNdc<TezRawKeyValueIterator>
{
     @Override
-    public TezRawKeyValueIterator call() throws IOException, InterruptedException {
+    protected TezRawKeyValueIterator callInternal() throws IOException, InterruptedException
{
 
       synchronized (this) {
         for (int i = 0; i < numFetchers; ++i) {

http://git-wip-us.apache.org/repos/asf/tez/blob/09ffc24c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 91fe661..d36053c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.library.common.comparator.ProxyComparator;
 import org.apache.hadoop.io.RawComparator;
@@ -898,7 +899,7 @@ public class PipelinedSorter extends ExternalSorter {
     }
   }
 
-  private static class SortTask implements Callable<SpanIterator> {
+  private static class SortTask extends CallableWithNdc<SpanIterator> {
     private final SortSpan sortable;
     private final IndexedSorter sorter;
 
@@ -907,7 +908,8 @@ public class PipelinedSorter extends ExternalSorter {
         this.sorter = sorter;
     }
 
-    public SpanIterator call() {
+    @Override
+    protected SpanIterator callInternal() {
       return sortable.sort(sorter);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/09ffc24c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 33c2122..be128a9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
@@ -311,7 +312,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   }
 
   // All spills using compression for now.
-  private class SpillCallable implements Callable<SpillResult> {
+  private class SpillCallable extends CallableWithNdc<SpillResult> {
 
     private final WrappedBuffer wrappedBuffer;
     private final CompressionCodec codec;
@@ -329,7 +330,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
 
     @Override
-    public SpillResult call() throws IOException {
+    protected SpillResult callInternal() throws IOException {
       // This should not be called with an empty buffer. Check before invoking.
 
       // Number of parallel spills determined by number of threads.
@@ -506,7 +507,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     } else {
       updateGlobalStats(currentBuffer);
       SpillCallable spillCallable = new SpillCallable(currentBuffer, 0, codec, null, true);
-      spillCallable.call();
+      try {
+        spillCallable.call();
+      } catch (Exception ex) {
+        throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
+      }
       return;
     }
 


Mime
View raw message