tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [1/2] tez git commit: TEZ-2775. Improve and consolidate logging in Runtime components. Contributed by Siddharth Seth and Rajesh Balamohan.
Date Mon, 21 Sep 2015 17:36:45 GMT
Repository: tez
Updated Branches:
  refs/heads/master 7ed7025ad -> 983ceeee1


http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 18c8302..0e1fe9f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -144,7 +144,7 @@ public class Shuffle implements ExceptionReporter {
     TezCounter mergedMapOutputsCounter =
         inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
 
-    LOG.info("Shuffle assigned with " + numInputs + " inputs" + ", codec: "
+    LOG.info(srcNameTrimmed + ": " + "Shuffle assigned with " + numInputs + " inputs" + ", codec: "
         + (codec == null ? "None" : codec.getClass().getName()) + 
         "ifileReadAhead: " + ifileReadAhead);
 
@@ -190,7 +190,7 @@ public class Shuffle implements ExceptionReporter {
         sslShuffle);
     
     ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
-        .setDaemon(true).setNameFormat("ShuffleAndMergeRunner [" + srcNameTrimmed + "]").build());
+        .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}").build());
 
 
     executor = MoreExecutors.listeningDecorator(rawExecutor);
@@ -201,7 +201,7 @@ public class Shuffle implements ExceptionReporter {
     if (!isShutDown.get()) {
       eventHandler.handleEvents(events);
     } else {
-      LOG.info("Ignoring events since already shutdown. EventCount: " + events.size());
+      LOG.info(srcNameTrimmed + ": " + "Ignoring events since already shutdown. EventCount: " + events.size());
     }
 
   }
@@ -327,7 +327,7 @@ public class Shuffle implements ExceptionReporter {
       }
 
       inputContext.inputIsReady();
-      LOG.info("merge complete for input vertex : " + inputContext.getSourceVertexName());
+      LOG.info("merge complete for input vertex : " + srcNameTrimmed);
       return kvIter;
     }
   }
@@ -337,7 +337,7 @@ public class Shuffle implements ExceptionReporter {
       cleanupShuffleScheduler();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      LOG.info("Interrupted while attempting to close the scheduler during cleanup. Ignoring");
+      LOG.info(srcNameTrimmed + ": " + "Interrupted while attempting to close the scheduler during cleanup. Ignoring");
     }
   }
 
@@ -355,13 +355,13 @@ public class Shuffle implements ExceptionReporter {
         if (ignoreErrors) {
           //Reset the status
           Thread.currentThread().interrupt();
-          LOG.info("Interrupted while attempting to close the merger during cleanup. Ignoring");
+          LOG.info(srcNameTrimmed + ": " + "Interrupted while attempting to close the merger during cleanup. Ignoring");
         } else {
           throw e;
         }
       } catch (Throwable e) {
         if (ignoreErrors) {
-          LOG.info("Exception while trying to shutdown merger, Ignoring", e);
+          LOG.info(srcNameTrimmed + ": " + "Exception while trying to shutdown merger, Ignoring", e);
         } else {
           throw e;
         }
@@ -371,10 +371,13 @@ public class Shuffle implements ExceptionReporter {
 
   private void cleanupIgnoreErrors() {
     try {
+      if (eventHandler != null) {
+        eventHandler.logProgress(true);
+      }
       cleanupShuffleSchedulerIgnoreErrors();
       cleanupMerger(true);
     } catch (Throwable t) {
-      LOG.info("Error in cleaning up.., ", t);
+      LOG.info(srcNameTrimmed + ": " + "Error in cleaning up.., ", t);
     }
   }
 
@@ -383,7 +386,7 @@ public class Shuffle implements ExceptionReporter {
   public synchronized void reportException(Throwable t) {
     // RunShuffleCallable onFailure deals with ignoring errors on shutdown.
     if (throwable.get() == null) {
-      LOG.info("Setting throwable in reportException with message [" + t.getMessage() +
+      LOG.info(srcNameTrimmed + ": " + "Setting throwable in reportException with message [" + t.getMessage() +
           "] from thread [" + Thread.currentThread().getName());
       throwable.set(t);
       throwingThreadName = Thread.currentThread().getName();
@@ -409,15 +412,15 @@ public class Shuffle implements ExceptionReporter {
   private class ShuffleRunnerFutureCallback implements FutureCallback<TezRawKeyValueIterator> {
     @Override
     public void onSuccess(TezRawKeyValueIterator result) {
-      LOG.info("Shuffle Runner thread complete");
+      LOG.info(srcNameTrimmed + ": " + "Shuffle Runner thread complete");
     }
 
     @Override
     public void onFailure(Throwable t) {
       if (isShutDown.get()) {
-        LOG.info("Already shutdown. Ignoring error");
+        LOG.info(srcNameTrimmed + ": " + "Already shutdown. Ignoring error");
       } else {
-        LOG.error("ShuffleRunner failed with error", t);
+        LOG.error(srcNameTrimmed + ": " + "ShuffleRunner failed with error", t);
         // In case of an abort / Interrupt - the runtime makes sure that this is ignored.
         inputContext.fatalError(t, "Shuffle Runner Failed");
         cleanupIgnoreErrors();

http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 9481e65..e0473b3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.BitSet;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.tez.common.TezCommonUtils;
@@ -41,7 +43,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovem
 
 import com.google.protobuf.InvalidProtocolBufferException;
 
-public class ShuffleInputEventHandlerOrderedGrouped {
+public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandler {
   
   private static final Logger LOG = LoggerFactory.getLogger(ShuffleInputEventHandlerOrderedGrouped.class);
 
@@ -51,6 +53,11 @@ public class ShuffleInputEventHandlerOrderedGrouped {
   private int maxMapRuntime = 0;
   private final boolean sslShuffle;
 
+  private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
+  private final AtomicInteger numDmeEvents = new AtomicInteger(0);
+  private final AtomicInteger numObsoletionEvents = new AtomicInteger(0);
+  private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0);
+
   public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext,
                                                 ShuffleScheduler scheduler, boolean sslShuffle) {
     this.inputContext = inputContext;
@@ -58,20 +65,36 @@ public class ShuffleInputEventHandlerOrderedGrouped {
     this.sslShuffle = sslShuffle;
   }
 
+  @Override
   public void handleEvents(List<Event> events) throws IOException {
     for (Event event : events) {
       handleEvent(event);
     }
   }
-  
-  
+
+  @Override
+  public void logProgress(boolean updateOnClose) {
+    LOG.info(inputContext.getSourceVertexName() + ": "
+        + "numDmeEventsSeen=" + numDmeEvents.get()
+        + ", numDmeEventsSeenWithNoData=" + numDmeEventsNoData.get()
+        + ", numObsoletionEventsSeen=" + numObsoletionEvents.get()
+        + (updateOnClose == true ? ", updateOnClose" : ""));
+  }
+
   private void handleEvent(Event event) throws IOException {
     if (event instanceof DataMovementEvent) {
+      numDmeEvents.incrementAndGet();
       processDataMovementEvent((DataMovementEvent) event);
       scheduler.updateEventReceivedTime();
     } else if (event instanceof InputFailedEvent) {
+      numObsoletionEvents.incrementAndGet();
       processTaskFailedEvent((InputFailedEvent) event);
     }
+    if (numDmeEvents.get() + numObsoletionEvents.get() > nextToLogEventCount.get()) {
+      logProgress(false);
+      // Log every 50 events seen.
+      nextToLogEventCount.addAndGet(50);
+    }
   }
 
   private void processDataMovementEvent(DataMovementEvent dmEvent) throws IOException {
@@ -82,8 +105,11 @@ public class ShuffleInputEventHandlerOrderedGrouped {
       throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
     } 
     int partitionId = dmEvent.getSourceIndex();
-    LOG.info("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex()
-        + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex()
+          + ", attemptNum: " + dmEvent.getVersion() + ", payload: " +
+          ShuffleUtils.stringify(shufflePayload));
+    }
     // TODO NEWTEZ See if this duration hack can be removed.
     int duration = shufflePayload.getRunDuration();
     if (duration > maxMapRuntime) {
@@ -101,6 +127,7 @@ public class ShuffleInputEventHandlerOrderedGrouped {
                 "Source partition: " + partitionId + " did not generate any data. SrcAttempt: ["
                     + srcAttemptIdentifier + "]. Not fetching.");
           }
+          numDmeEventsNoData.incrementAndGet();
           scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
           return;
         }
@@ -120,10 +147,11 @@ public class ShuffleInputEventHandlerOrderedGrouped {
   private void processTaskFailedEvent(InputFailedEvent ifEvent) {
     InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion());
     scheduler.obsoleteInput(taIdentifier);
-    LOG.info("Obsoleting output of src-task: " + taIdentifier);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Obsoleting output of src-task: " + taIdentifier);
+    }
   }
 
-  // TODO NEWTEZ Handle encrypted shuffle
   @VisibleForTesting
   URI getBaseURI(String host, int port, int partitionId) {
     StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,

http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 26464bb..f45ca35 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -207,7 +207,6 @@ class ShuffleScheduler {
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
     numFetchers = Math.min(configuredNumFetchers, numInputs);
-    LOG.info("Num fetchers determined to be: " + numFetchers);
 
     localDiskFetchEnabled = conf.getBoolean(
         TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
@@ -260,7 +259,7 @@ class ShuffleScheduler {
 
     ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
         new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d").build());
+            .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build());
     this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
 
     this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5);
@@ -293,7 +292,8 @@ class ShuffleScheduler {
         + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches
         + ", abortFailureLimit=" + abortFailureLimit
         + ", maxMapRuntime=" + maxMapRuntime
-        + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce);
+        + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce
+        + ", numFetchers=" + numFetchers);
   }
 
   public void start() throws Exception {
@@ -305,6 +305,8 @@ class ShuffleScheduler {
   public void close() throws InterruptedException {
     if (!isShutdown.getAndSet(true)) {
 
+      logProgress();
+
       // Notify and interrupt the waiting scheduler thread
       synchronized (this) {
         notifyAll();
@@ -518,19 +520,24 @@ class ShuffleScheduler {
 
   @VisibleForTesting
   void reportExceptionForInput(Exception exception) {
-    LOG.error("Reporting exception for input", exception);
+    LOG.error(srcNameTrimmed + ": " + "Reporting exception for input", exception);
     exceptionReporter.reportException(exception);
   }
 
+  private final AtomicInteger nextProgressLineEventCount = new AtomicInteger(0);
+
   private void logProgress() {
-    double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
     int inputsDone = numInputs - remainingMaps.get();
-    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
-
-    double transferRate = mbs / secsSinceStart;
-    LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills + ") of " + numInputs +
-        ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
-        + mbpsFormat.format(transferRate) + " MB/s)");
+    if (inputsDone > nextProgressLineEventCount.get() || inputsDone == numInputs || isShutdown.get()) {
+      nextProgressLineEventCount.addAndGet(50);
+      double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
+      long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
+
+      double transferRate = mbs / secsSinceStart;
+      LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills + ") of " + numInputs +
+          ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
+          + mbpsFormat.format(transferRate) + " MB/s)");
+    }
   }
 
   public synchronized void copyFailed(InputAttemptIdentifier srcAttempt,
@@ -585,7 +592,7 @@ class ShuffleScheduler {
   }
 
   public void reportLocalError(IOException ioe) {
-    LOG.error("Shuffle failed : caused by local error", ioe);
+    LOG.error(srcNameTrimmed + ": " + "Shuffle failed : caused by local error", ioe);
     // Shuffle knows how to deal with failures post shutdown via the onFailure hook
     exceptionReporter.reportException(ioe);
   }
@@ -598,7 +605,7 @@ class ShuffleScheduler {
       boolean connectError) {
     if ((reportReadErrorImmediately && (readError || connectError))
         || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
-      LOG.info("Reporting fetch failure for InputIdentifier: "
+      LOG.info(srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: "
           + srcAttempt + " taskAttemptIdentifier: "
           + TezRuntimeUtils.getTaskAttemptIdentifier(
           inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
@@ -655,7 +662,8 @@ class ShuffleScheduler {
         failureCounts.size() == (numInputs - doneMaps))
         && !reducerHealthy
         && (!reducerProgressedEnough || reducerStalled)) {
-      LOG.error("Shuffle failed with too many fetch failures " + "and insufficient progress!"
+      LOG.error(srcNameTrimmed + ": " + "Shuffle failed with too many fetch failures " +
+          "and insufficient progress!"
           + "failureCounts=" + failureCounts.size() + ", pendingInputs=" + (numInputs - doneMaps)
           + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough="
           + reducerProgressedEnough + ", reducerStalled=" + reducerStalled);
@@ -700,7 +708,7 @@ class ShuffleScheduler {
   
   public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) {
     // The incoming srcAttempt does not contain a path component.
-    LOG.info("Adding obsolete input: " + srcAttempt);
+    LOG.info(srcNameTrimmed + ": " + "Adding obsolete input: " + srcAttempt);
     if (shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) {
       //Pipelined shuffle case (where shuffleInfoEventsMap gets populated).
       //Fail fast here.
@@ -719,7 +727,9 @@ class ShuffleScheduler {
 
   public synchronized MapHost getHost() throws InterruptedException {
     while (pendingHosts.isEmpty() && remainingMaps.get() > 0) {
-      LOG.info("PendingHosts=" + pendingHosts);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("PendingHosts=" + pendingHosts);
+      }
       wait();
     }
 
@@ -735,7 +745,7 @@ class ShuffleScheduler {
       pendingHosts.remove(host);
       host.markBusy();
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Assigning " + host + " with " + host.getNumKnownMapOutputs() +
+        LOG.debug(srcNameTrimmed + ": " + "Assigning " + host + " with " + host.getNumKnownMapOutputs() +
             " to " + Thread.currentThread().getName());
       }
       shuffleStart.set(System.currentTimeMillis());
@@ -844,8 +854,10 @@ class ShuffleScheduler {
         notifyAll();
       }
     }
-    LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " +
-        (System.currentTimeMillis() - shuffleStart.get()) + "ms");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(host + " freed by " + Thread.currentThread().getName() + " in " +
+          (System.currentTimeMillis() - shuffleStart.get()) + "ms");
+    }
   }
 
   public synchronized void resetKnownMaps() {
@@ -896,8 +908,8 @@ class ShuffleScheduler {
    */
   private class Referee extends Thread {
     public Referee() {
-      setName("ShufflePenaltyReferee ["
-          + TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + "]");
+      setName("ShufflePenaltyReferee {"
+          + TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + "}");
       setDaemon(true);
     }
 
@@ -955,7 +967,7 @@ class ShuffleScheduler {
                 ShuffleScheduler.this.wait();
               } catch (InterruptedException e) {
                 if (isShutdown.get()) {
-                  LOG.info(
+                  LOG.info(srcNameTrimmed + ": " +
                       "Interrupted while waiting for fetchers to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
                   Thread.currentThread().interrupt();
                   break;
@@ -968,7 +980,7 @@ class ShuffleScheduler {
         }
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug("NumCompletedInputs: {}" + (numInputs - remainingMaps.get()));
+          LOG.debug(srcNameTrimmed + ": " + "NumCompletedInputs: {}" + (numInputs - remainingMaps.get()));
         }
 
         // Ensure there's memory available before scheduling the next Fetcher.
@@ -979,7 +991,7 @@ class ShuffleScheduler {
           mergeManager.waitForShuffleToMergeMemory();
         } catch (InterruptedException e) {
           if (isShutdown.get()) {
-            LOG.info(
+            LOG.info(srcNameTrimmed + ": " +
                 "Interrupted while waiting for merge to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
             Thread.currentThread().interrupt();
             break;
@@ -998,7 +1010,7 @@ class ShuffleScheduler {
                 mapHost = getHost();  // Leads to a wait.
               } catch (InterruptedException e) {
                 if (isShutdown.get()) {
-                  LOG.info(
+                  LOG.info(srcNameTrimmed + ": " +
                       "Interrupted while waiting for host and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
                   Thread.currentThread().interrupt();
                   break;
@@ -1010,11 +1022,14 @@ class ShuffleScheduler {
                 break; // Check for the exit condition.
               }
               if (LOG.isDebugEnabled()) {
-                LOG.debug("Processing pending host: " + mapHost.toString());
+                LOG.debug(srcNameTrimmed + ": " + "Processing pending host: " + mapHost.toString());
               }
               if (!isShutdown.get()) {
                 count++;
-                LOG.info("Scheduling fetch for inputHost: {}", mapHost.getIdentifier());
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug(srcNameTrimmed + ": " + "Scheduling fetch for inputHost: {}",
+                      mapHost.getIdentifier());
+                }
                 FetcherOrderedGrouped fetcherOrderedGrouped = constructFetcherForHost(mapHost);
                 runningFetchers.add(fetcherOrderedGrouped);
                 ListenableFuture<Void> future = fetcherExecutor.submit(fetcherOrderedGrouped);
@@ -1063,7 +1078,7 @@ class ShuffleScheduler {
     public void onSuccess(Void result) {
       fetcherOrderedGrouped.shutDown();
       if (isShutdown.get()) {
-        LOG.info("Already shutdown. Ignoring fetch complete");
+        LOG.info(srcNameTrimmed + ": " + "Already shutdown. Ignoring fetch complete");
       } else {
         doBookKeepingForFetcherComplete();
       }
@@ -1073,9 +1088,9 @@ class ShuffleScheduler {
     public void onFailure(Throwable t) {
       fetcherOrderedGrouped.shutDown();
       if (isShutdown.get()) {
-        LOG.info("Already shutdown. Ignoring fetch complete");
+        LOG.info(srcNameTrimmed + ": " + "Already shutdown. Ignoring fetch complete");
       } else {
-        LOG.error("Fetcher failed with error", t);
+        LOG.error(srcNameTrimmed + ": " + "Fetcher failed with error", t);
         exceptionReporter.reportException(t);
         doBookKeepingForFetcherComplete();
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index d27d08d..aa521ea 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -169,7 +169,10 @@ public abstract class ExternalSorter {
 
     rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
 
-    LOG.info("Initial Mem : " + initialMemoryAvailable + ", assignedMb=" + ((initialMemoryAvailable >> 20)));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(outputContext.getDestinationVertexName() + ": Initial Mem bytes : " +
+          initialMemoryAvailable + ", in MB=" + ((initialMemoryAvailable >> 20)));
+    }
     int assignedMb = (int) (initialMemoryAvailable >> 20);
     //Let the overflow checks happen in appropriate sorter impls
     this.availableMemoryMb = assignedMb;
@@ -187,9 +190,13 @@ public abstract class ExternalSorter {
     serializationFactory = new SerializationFactory(this.conf);
     keySerializer = serializationFactory.getSerializer(keyClass);
     valSerializer = serializationFactory.getSerializer(valClass);
-    LOG.info("keySerializer=" + keySerializer + "; valueSerializer=" + valSerializer
-        + "; comparator=" + (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf)
-        + "; conf=" + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+    LOG.info(outputContext.getDestinationVertexName() + " using: "
+        + "memoryMb=" + assignedMb
+        + ", keySerializerClass=" + keyClass
+        + ", valueSerializerClass=" + valSerializer
+        + ", comparator=" + (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf)
+        + ", partitioner=" + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS)
+        + ", serialization=" + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
 
     //    counters    
     mapOutputByteCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
@@ -244,8 +251,7 @@ public abstract class ExternalSorter {
     
     // Task outputs
     mapOutputFile = TezRuntimeUtils.instantiateTaskOutputManager(conf, outputContext);
-    
-    LOG.info("Instantiating Partitioner: [" + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS) + "]");
+
     this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, this.partitions);
     this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
     this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext);
@@ -339,9 +345,11 @@ public abstract class ExternalSorter {
         TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " " + initialMemRequestMb + " should be "
             + "larger than 0 and should be less than the available task memory (MB):" +
             (maxAvailableTaskMemory >> 20));
-    LOG.info("Requested SortBufferSize ("
-        + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "): "
-        + initialMemRequestMb);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Requested SortBufferSize ("
+          + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "): "
+          + initialMemRequestMb);
+    }
     return reqBytes;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index c4b2b3d..81f5211 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -1,4 +1,4 @@
-/**
+  /**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
@@ -120,7 +120,9 @@ public class PipelinedSorter extends ExternalSorter {
   PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
       long initialMemoryAvailable, int blkSize) throws IOException {
     super(outputContext, conf, numOutputs, initialMemoryAvailable);
-    
+
+    StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ")
+        .append(outputContext.getDestinationVertexName()).append(": ");
     partitionBits = bitcount(partitions)+1;
 
     boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
@@ -140,10 +142,29 @@ public class PipelinedSorter extends ExternalSorter {
     long usage = sortmb << 20;
     //Divide total memory into different blocks.
     int numberOfBlocks = Math.max(1, (int) Math.ceil(1.0 * usage / blockSize));
-    LOG.info("Number of Blocks : " + numberOfBlocks
-        + ", maxMemUsage=" + maxMemUsage + ", BLOCK_SIZE=" + blockSize + ", finalMergeEnabled="
-        + isFinalMergeEnabled() + ", pipelinedShuffle=" + pipelinedShuffle + ", "
-        + "sendEmptyPartitionDetails=" + sendEmptyPartitionDetails);
+    initialSetupLogLine.append("#blocks=").append(numberOfBlocks);
+    initialSetupLogLine.append(", maxMemUsage=").append(maxMemUsage);
+    initialSetupLogLine.append(", BLOCK_SIZE=").append(blockSize);
+    initialSetupLogLine.append(", finalMergeEnabled=").append(isFinalMergeEnabled());
+    initialSetupLogLine.append(", pipelinedShuffle=").append(pipelinedShuffle);
+    initialSetupLogLine.append(", sendEmptyPartitions=").append(sendEmptyPartitionDetails);
+    initialSetupLogLine.append(", ").append(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB).append(
+        "=").append(
+        sortmb);
+
+
+    initialSetupLogLine.append(", UsingHashComparator=");
+    // k/v serialization
+    if(comparator instanceof ProxyComparator) {
+      hasher = (ProxyComparator)comparator;
+      initialSetupLogLine.append(true);
+    } else {
+      hasher = null;
+      initialSetupLogLine.append(false);
+    }
+
+    LOG.info(initialSetupLogLine.toString());
+
     long totalCapacityWithoutMeta = 0;
     for (int i = 0; i < numberOfBlocks; i++) {
       Preconditions.checkArgument(usage > 0, "usage can't be less than zero " + usage);
@@ -157,7 +178,6 @@ public class PipelinedSorter extends ExternalSorter {
     listIterator = bufferList.listIterator();
 
 
-    LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " = " + sortmb);
     Preconditions.checkArgument(listIterator.hasNext(), "Buffer list seems to be empty " + bufferList.size());
     span = new SortSpan(listIterator.next(), 1024*1024, 16, this.comparator);
     merger = new SpanMerger(); // SpanIterators are comparable
@@ -167,17 +187,11 @@ public class PipelinedSorter extends ExternalSorter {
                 TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT);
     sortmaster = Executors.newFixedThreadPool(sortThreads,
         new ThreadFactoryBuilder().setDaemon(true)
-        .setNameFormat("Sorter [" + TezUtilsInternal
-            .cleanVertexName(outputContext.getDestinationVertexName()) + "] #%d")
+        .setNameFormat("Sorter {" + TezUtilsInternal
+            .cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d")
         .build());
 
-    // k/v serialization    
-    if(comparator instanceof ProxyComparator) {
-      hasher = (ProxyComparator)comparator;
-      LOG.info("Using the HashComparator");
-    } else {
-      hasher = null;
-    }    
+
     valSerializer.open(span.out);
     keySerializer.open(span.out);
     minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
@@ -217,7 +231,9 @@ public class PipelinedSorter extends ExternalSorter {
       merger.add(span.sort(sorter));
       boolean ret = spill(true);
       stopWatch.stop();
-      LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(outputContext.getDestinationVertexName() + ": Time taken for spill " + (stopWatch.elapsedMillis()) + " ms");
+      }
       if (pipelinedShuffle && ret) {
         sendPipelinedShuffleEvents();
       }
@@ -257,7 +273,8 @@ public class PipelinedSorter extends ExternalSorter {
         (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails,
         pathComponent, partitionStats);
     outputContext.sendEvents(events);
-    LOG.info("Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
+    LOG.info(outputContext.getDestinationVertexName() +
+        ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
   }
 
   @Override
@@ -363,11 +380,15 @@ public class PipelinedSorter extends ExternalSorter {
     final TezSpillRecord spillRec = new TezSpillRecord(partitions);
     // getSpillFileForWrite with size -1 as the serialized size of KV pair is still unknown
     final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, -1);
+    Path indexFilename =
+        mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+            * MAP_OUTPUT_INDEX_RECORD_LENGTH);
     spillFilePaths.put(numSpills, filename);
     FSDataOutputStream out = rfs.create(filename, true, 4096);
 
     try {
-      LOG.info("Spilling to " + filename.toString());
+      LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString() +
+          ", indexFilename=" + indexFilename);
       for (int i = 0; i < partitions; ++i) {
         if (isThreadInterrupted()) {
           return;
@@ -403,10 +424,6 @@ public class PipelinedSorter extends ExternalSorter {
         }
       }
 
-      Path indexFilename =
-          mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
-              * MAP_OUTPUT_INDEX_RECORD_LENGTH);
-      LOG.info("Spill Index filename:" + indexFilename);
       spillFileIndexPaths.put(numSpills, indexFilename);
       spillRec.writeToFile(indexFilename, conf);
       //TODO: honor cache limits
@@ -437,8 +454,8 @@ public class PipelinedSorter extends ExternalSorter {
         }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        LOG.info("Interrupted while waiting for mergers to complete");
-        throw new IOInterruptedException("Interrupted while waiting for mergers to complete", e);
+        LOG.info(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete");
+        throw new IOInterruptedException(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete", e);
       }
 
       // create spill file
@@ -449,7 +466,7 @@ public class PipelinedSorter extends ExternalSorter {
         mapOutputFile.getSpillFileForWrite(numSpills, size);
       spillFilePaths.put(numSpills, filename);
       out = rfs.create(filename, true, 4096);
-      LOG.info("Spilling to " + filename.toString());
+      LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString());
       for (int i = 0; i < partitions; ++i) {
         if (isThreadInterrupted()) {
           return false;
@@ -509,7 +526,7 @@ public class PipelinedSorter extends ExternalSorter {
         cleanup();
       }
       sortmaster.shutdownNow();
-      LOG.info("Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster
+      LOG.info(outputContext.getDestinationVertexName() + ": Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster
           .isShutdown() + ", terminated=" + sortmaster.isTerminated());
       return true;
     }
@@ -530,7 +547,7 @@ public class PipelinedSorter extends ExternalSorter {
     }
 
     try {
-      LOG.info("Starting flush of map output");
+      LOG.info(outputContext.getDestinationVertexName() + ": Starting flush of map output");
       span.end();
       merger.add(span.sort(sorter));
       // force a spill in flush()
@@ -549,7 +566,14 @@ public class PipelinedSorter extends ExternalSorter {
 
 
       if(indexCacheList.isEmpty()) {
-        LOG.warn("Index list is empty... returning");
+        /*
+         * If we do not have this check, and if the task gets killed in the middle, it can throw
+         * NPE leading to distraction when debugging.
+         */
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(outputContext.getDestinationVertexName()
+              + ": Index list is empty... returning");
+        }
         return;
       }
 
@@ -567,7 +591,7 @@ public class PipelinedSorter extends ExternalSorter {
           ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
               outputContext, i, indexCacheList.get(i), partitions,
               sendEmptyPartitionDetails, pathComponent, partitionStats);
-          LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
+          LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
         }
         outputContext.sendEvents(events);
         return;
@@ -586,8 +610,9 @@ public class PipelinedSorter extends ExternalSorter {
 
         sameVolRename(filename, finalOutputFile);
         sameVolRename(indexFilename, finalIndexFile);
-        if (LOG.isInfoEnabled()) {
-          LOG.info("numSpills=" + numSpills + ", finalOutputFile=" + finalOutputFile + ", "
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(outputContext.getDestinationVertexName() + ": numSpills=" + numSpills +
+              ", finalOutputFile=" + finalOutputFile + ", "
               + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
               indexFilename);
         }
@@ -608,7 +633,7 @@ public class PipelinedSorter extends ExternalSorter {
           mapOutputFile.getOutputIndexFileForWrite(0); //TODO
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug(
+        LOG.debug(outputContext.getDestinationVertexName() + ": " +
             "numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:"
                 + finalIndexFile);
       }
@@ -773,8 +798,8 @@ public class PipelinedSorter extends ExternalSorter {
       }
       ByteBuffer reserved = source.duplicate();
       reserved.mark();
-      LOG.info("reserved.remaining() = " + reserved.remaining());
-      LOG.info("reserved.metasize = "+ metasize);
+      LOG.info(outputContext.getDestinationVertexName() + ": " + "reserved.remaining()=" +
+          reserved.remaining() + ", reserved.metasize=" + metasize);
       reserved.position(metasize);
       kvbuffer = reserved.slice();
       reserved.flip();
@@ -793,7 +818,7 @@ public class PipelinedSorter extends ExternalSorter {
       if(length() > 1) {
         sorter.sort(this, 0, length(), nullProgressable);
       }
-      LOG.info("done sorting span=" + index + ", length=" + length() + ", "
+      LOG.info(outputContext.getDestinationVertexName() + ": " + "done sorting span=" + index + ", length=" + length() + ", "
           + "time=" + (System.currentTimeMillis() - start));
       return new SpanIterator(this);
     }
@@ -861,7 +886,7 @@ public class PipelinedSorter extends ExternalSorter {
         newSpan = new SortSpan(remaining, items, perItem,
             ConfigUtils.getIntermediateOutputKeyComparator(conf));
         newSpan.index = index+1;
-        LOG.info(String.format("New Span%d.length = %d, perItem = %d", newSpan.index, newSpan
+        LOG.info(String.format(outputContext.getDestinationVertexName() + ": " + "New Span%d.length = %d, perItem = %d", newSpan.index, newSpan
             .length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue());
         return newSpan;
       }
@@ -883,11 +908,11 @@ public class PipelinedSorter extends ExternalSorter {
         return null;
       }
       int perItem = kvbuffer.position()/items;
-      LOG.info(String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
+      LOG.info(outputContext.getDestinationVertexName() + ": " + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
       if(remaining.remaining() < METASIZE+perItem) {
         //Check if we can get the next Buffer from the main buffer list
         if (listIterator.hasNext()) {
-          LOG.info("Getting memory from next block in the list, recordsWritten=" +
+          LOG.info(outputContext.getDestinationVertexName() + ": " + "Getting memory from next block in the list, recordsWritten=" +
               mapOutputRecordCounter.getValue());
           reinit = true;
           return listIterator.next();
@@ -1184,10 +1209,10 @@ public class PipelinedSorter extends ExternalSorter {
             total += sp.span.length();
             eq += sp.span.getEq();
         }
-        LOG.info("Heap = " + sb.toString());
+        LOG.info(outputContext.getDestinationVertexName() + ": " + "Heap = " + sb.toString());
         return true;
       } catch(ExecutionException e) {
-        LOG.info(e.toString());
+        LOG.info(outputContext.getDestinationVertexName() + ": " + e.toString());
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index edc02f3..727f9a2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -130,7 +130,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     final float spillper = this.conf.getFloat(
         TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT,
         TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT);
-    final int sortmb = computeSortBufferSize((int) availableMemoryMb);
+    final int sortmb = computeSortBufferSize((int) availableMemoryMb, outputContext.getDestinationVertexName());
 
     Preconditions.checkArgument(spillper <= (float) 1.0 && spillper > (float) 0.0,
         TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT
@@ -144,7 +144,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
 
     if (confPipelinedShuffle) {
-      LOG.warn(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED  + " does not work "
+      LOG.warn(outputContext.getDestinationVertexName() + ": " +
+          TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " does not work "
           + "with DefaultSorter. It is supported only with PipelinedSorter.");
     }
 
@@ -164,10 +165,14 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     softLimit = (int)(kvbuffer.length * spillper);
     bufferRemaining = softLimit;
     if (LOG.isInfoEnabled()) {
-      LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + ": " + sortmb);
-      LOG.info("soft limit at " + softLimit);
-      LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
-      LOG.info("kvstart = " + kvstart + "; length = " + maxRec + "; finalMergeEnabled = " + isFinalMergeEnabled());
+      LOG.info(outputContext.getDestinationVertexName() + ": "
+          + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "=" + sortmb
+          + ", soft limit=" + softLimit
+          + ", bufstart=" + bufstart
+          + ", bufvoid=" + bufvoid
+          + ", kvstart=" + kvstart
+          + ", legnth=" + maxRec
+          + ", finalMergeEnabled=" + isFinalMergeEnabled());
     }
 
     // k/v serialization
@@ -177,8 +182,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     spillInProgress = false;
     minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
     spillThread.setDaemon(true);
-    spillThread.setName("SpillThread ["
-        + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName() + "]"));
+    spillThread.setName("SpillThread {"
+        + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName() + "}"));
     spillLock.lock();
     try {
       spillThread.start();
@@ -200,7 +205,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   }
 
   @VisibleForTesting
-  static int computeSortBufferSize(int availableMemoryMB) {
+  static int computeSortBufferSize(int availableMemoryMB, String logContext) {
 
     if (availableMemoryMB <= 0) {
       throw new RuntimeException(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB +
@@ -208,7 +213,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     }
 
     if (availableMemoryMB > MAX_IO_SORT_MB) {
-      LOG.warn("Scaling down " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB +
+      LOG.warn(logContext + ": Scaling down " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB +
           "=" + availableMemoryMB + " to " + MAX_IO_SORT_MB
           + " (max sort buffer size supported forDefaultSorter)");
     }
@@ -354,7 +359,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       kvindex = (int)(((long)kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity());
       totalKeys++;
     } catch (MapBufferTooSmallException e) {
-      LOG.info("Record too large for in-memory buffer: " + e.getMessage());
+      LOG.info(outputContext.getDestinationVertexName() + ": Record too large for in-memory buffer: " + e.getMessage());
       spillSingleRecord(key, value, partition);
       mapOutputRecordCounter.increment(1);
       return;
@@ -373,7 +378,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     // Cast one of the operands to long to avoid integer overflow
     kvindex = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
     if (LOG.isInfoEnabled()) {
-      LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
+      LOG.info(outputContext.getDestinationVertexName() + ": " + "(EQUATOR) " + pos + " kvi " + kvindex +
           "(" + (kvindex * 4) + ")");
     }
   }
@@ -391,7 +396,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     // Cast one of the operands to long to avoid integer overflow
     kvstart = kvend = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
     if (LOG.isInfoEnabled()) {
-      LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
+      LOG.info(outputContext.getDestinationVertexName() + ": " + "(RESET) equator " + e + " kv " + kvstart + "(" +
         (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
     }
   }
@@ -647,7 +652,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       spillThread.interrupt();
       spillThread.join();
     } catch (InterruptedException e) {
-      LOG.info("Spill thread interrupted");
+      LOG.info(outputContext.getDestinationVertexName() + ": " + "Spill thread interrupted");
       //Reset status
       Thread.currentThread().interrupt();
       throw new IOInterruptedException("Spill failed", e);
@@ -656,7 +661,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
 
   @Override
   public void flush() throws IOException {
-    LOG.info("Starting flush of map output");
+    LOG.info(outputContext.getDestinationVertexName() + ": " + "Starting flush of map output");
     if (Thread.currentThread().isInterrupted()) {
       /**
        * Possible that the thread got interrupted when flush was happening or when the flush was
@@ -691,13 +696,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         kvend = (kvindex + NMETA) % kvmeta.capacity();
         bufend = bufmark;
         if (LOG.isInfoEnabled()) {
-          LOG.info("Sorting & Spilling map output");
-          LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
-                   "; bufvoid = " + bufvoid);
-          LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
-                   "); kvend = " + kvend + "(" + (kvend * 4) +
-                   "); length = " + (distanceTo(kvend, kvstart,
-                         kvmeta.capacity()) + 1) + "/" + maxRec);
+          LOG.info(
+              outputContext.getDestinationVertexName() + ": " + "Sorting & Spilling map output. "
+                  + "bufstart = " + bufstart + ", bufend = " + bufmark + ", bufvoid = " + bufvoid
+                  + "; " + "kvstart=" + kvstart + "(" + (kvstart * 4) + ")"
+                  + ", kvend = " + kvend + "(" + (kvend * 4) + ")"
+                  + ", length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" +
+                  maxRec);
         }
         sortAndSpill();
       }
@@ -743,7 +748,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
             spillLock.unlock();
             sortAndSpill();
           } catch (Throwable t) {
-            LOG.warn("Got an exception in sortAndSpill", t);
+            LOG.warn(outputContext.getDestinationVertexName() + ": " + "Got an exception in sortAndSpill", t);
             sortSpillException = t;
           } finally {
             spillLock.lock();
@@ -756,7 +761,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           }
         }
       } catch (InterruptedException e) {
-        LOG.info("Spill thread interrupted");
+        LOG.info(outputContext.getDestinationVertexName() + ": " + "Spill thread interrupted");
         Thread.currentThread().interrupt();
       } finally {
         spillLock.unlock();
@@ -787,13 +792,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     bufend = bufmark;
     spillInProgress = true;
     if (LOG.isInfoEnabled()) {
-      LOG.info("Spilling map output");
-      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
-               "; bufvoid = " + bufvoid);
-      LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
-               "); kvend = " + kvend + "(" + (kvend * 4) +
-               "); length = " + (distanceTo(kvend, kvstart,
-                     kvmeta.capacity()) + 1) + "/" + maxRec);
+      LOG.info(outputContext.getDestinationVertexName() + ": Spilling map output."
+          + "bufstart=" + bufstart + ", bufend = " + bufmark + ", bufvoid = " + bufvoid
+          +"; kvstart=" + kvstart + "(" + (kvstart * 4) + ")"
+          +", kvend = " + kvend + "(" + (kvend * 4) + ")"
+          + ", length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" + maxRec);
     }
     spillReady.signal();
   }
@@ -889,7 +892,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
               TezRawKeyValueIterator kvIter =
                 new MRResultIterator(spstart, spindex);
               if (LOG.isDebugEnabled()) {
-                LOG.debug("Running combine processor");
+                LOG.debug(outputContext.getDestinationVertexName() + ": " + "Running combine processor");
               }
               runCombineProcessor(kvIter, writer);
             }
@@ -927,7 +930,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         totalIndexCacheMemory +=
           spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
       }
-      LOG.info("Finished spill " + numSpills);
+      LOG.info(outputContext.getDestinationVertexName() + ": " + "Finished spill " + numSpills);
       ++numSpills;
       if (!isFinalMergeEnabled()) {
         numShuffleChunks.setValue(numSpills);
@@ -1113,7 +1116,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent,
         partitionStats);
 
-    LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);
+    LOG.info(outputContext.getDestinationVertexName() + ": " +
+        "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);
 
     if (sendEvent) {
       outputContext.sendEvents(events);
@@ -1271,7 +1275,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           segmentList.add(i, s);
 
           if (LOG.isDebugEnabled()) {
-            LOG.debug("TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
+            LOG.debug(outputContext.getDestinationVertexName() + ": "
+                + "TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
                 "Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
                 indexRecord.getRawLength() + ", " +
                 indexRecord.getPartLength() + ")");

http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 9a98cd1..70b345f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -92,6 +92,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   // Maybe setup a separate statistics class which can be shared between the
   // buffer and the main path instead of having multiple arrays.
 
+  private final String destNameTrimmed;
   private final long availableMemory;
   @VisibleForTesting
   final WrappedBuffer[] buffers;
@@ -150,6 +151,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     super(outputContext, conf, numOutputs);
     Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes");
 
+    this.destNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName());
     //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in
     // this case.  Add it later if needed.
     pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
@@ -171,13 +173,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     int maxSingleBufferSizeBytes = conf.getInt(
         TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, Integer.MAX_VALUE);
     computeNumBuffersAndSize(maxSingleBufferSizeBytes);
-    LOG.info("Running with numBuffers=" + numBuffers + ", sizePerBuffer=" + sizePerBuffer);
+
     availableBuffers = new LinkedBlockingQueue<WrappedBuffer>();
     buffers = new WrappedBuffer[numBuffers];
     // Set up only the first buffer to start with.
     buffers[0] = new WrappedBuffer(numOutputs, sizePerBuffer);
     numInitializedBuffers = 1;
-    LOG.info("Initialize Buffer #" + numInitializedBuffers + " with size=" + sizePerBuffer);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(destNameTrimmed + ": " + "Initializing Buffer #" +
+          numInitializedBuffers + " with size=" + sizePerBuffer);
+    }
     currentBuffer = buffers[0];
     baos = new ByteArrayOutputStream();
     dos = new DataOutputStream(baos);
@@ -190,8 +195,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         new ThreadFactoryBuilder()
             .setDaemon(true)
             .setNameFormat(
-                "UnorderedOutSpiller ["
-                    + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + "]")
+                "UnorderedOutSpiller {"
+                    + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + "}")
             .build());
     spillExecutor = MoreExecutors.listeningDecorator(executor);
     numRecordsPerPartition = new int[numPartitions];
@@ -214,7 +219,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       skipBuffers = false;
       writer = null;
     }
-    LOG.info("pipelinedShuffle=" + pipelinedShuffle + ", skipBuffers=" + skipBuffers);
+    LOG.info(destNameTrimmed + ": "
+        + "numBuffers=" + numBuffers
+        + ", sizePerBuffer=" + sizePerBuffer
+        + ", skipBuffers=" + skipBuffers
+        + ", pipelinedShuffle=" + pipelinedShuffle
+        + ", numPartitions=" + numPartitions);
   }
 
   private void computeNumBuffersAndSize(int bufferLimit) {
@@ -321,7 +331,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       currentBuffer.reset();
     } else {
       // Update overall stats
-      LOG.info("Moving to next buffer and triggering spill");
+      LOG.info(destNameTrimmed + ": " + "Moving to next buffer and triggering spill");
       updateGlobalStats(currentBuffer);
 
       pendingSpillCount.incrementAndGet();
@@ -420,10 +430,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       spillResult = new SpillResult(compressedLength, this.wrappedBuffer);
 
       handleSpillIndex(spillPathDetails, spillRecord);
-      LOG.info("Finished spill " + spillIndex);
+      LOG.info(destNameTrimmed + ": " + "Finished spill " + spillIndex);
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Spill=" + spillIndex + ", indexPath="
+        LOG.debug(destNameTrimmed + ": " + "Spill=" + spillIndex + ", indexPath="
             + spillPathDetails.indexFilePath + ", outputPath=" + spillPathDetails.outputFilePath);
       }
       return spillResult;
@@ -461,7 +471,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     isShutdown.set(true);
     spillLock.lock();
     try {
-      LOG.info("Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
+      LOG.info(destNameTrimmed + ": " + "Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
       while (pendingSpillCount.get() != 0 && spillException == null) {
         spillInProgress.await();
       }
@@ -469,7 +479,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       spillLock.unlock();
     }
     if (spillException != null) {
-      LOG.error("Error during spill, throwing");
+      LOG.error(destNameTrimmed + ": " + "Error during spill, throwing");
       // Assuming close will be called on the same thread as the write
       cleanup();
       currentBuffer.cleanup();
@@ -480,7 +490,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         throw new IOException(spillException);
       }
     } else {
-      LOG.info("All spills complete");
+      LOG.info(destNameTrimmed + ": " + "All spills complete");
       // Assuming close will be called on the same thread as the write
       cleanup();
 
@@ -687,7 +697,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       for (int i = 0; i < numPartitions; i++) {
         long segmentStart = out.getPos();
         if (numRecordsPerPartition[i] == 0) {
-          LOG.info("Skipping partition: " + i + " in final merge since it has no records");
+          LOG.info(destNameTrimmed + ": " + "Skipping partition: " + i + " in final merge since it has no records");
           continue;
         }
         writer = new Writer(conf, out, keyClass, valClass, codec, null, null);
@@ -739,7 +749,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
     finalSpillRecord.writeToFile(finalIndexPath, conf);
     fileOutputBytesCounter.increment(indexFileSizeEstimate);
-    LOG.info("Finished final spill after merging : " + numSpills.get() + " spills");
+    LOG.info(destNameTrimmed + ": " + "Finished final spill after merging : " + numSpills.get() + " spills");
   }
 
   private void writeLargeRecord(final Object key, final Object value, final int partition)
@@ -791,9 +801,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
       sendPipelinedEventForSpill(emptyPartitions, spillIndex, false);
 
-      LOG.info("Finished writing large record of size " + outSize + " to spill file " + spillIndex);
+      LOG.info(destNameTrimmed + ": " + "Finished writing large record of size " + outSize + " to spill file " + spillIndex);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("LargeRecord Spill=" + spillIndex + ", indexPath="
+        LOG.debug(destNameTrimmed + ": " + "LargeRecord Spill=" + spillIndex + ", indexPath="
             + spillPathDetails.indexFilePath + ", outputPath="
             + spillPathDetails.outputFilePath);
       }
@@ -902,10 +912,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate,
           pathComponent, emptyPartitions);
 
-      LOG.info("Adding spill event for spill (final update=" + isFinalUpdate + "), spillId=" + spillNumber);
+      LOG.info(destNameTrimmed + ": " + "Adding spill event for spill (final update=" + isFinalUpdate + "), spillId=" + spillNumber);
       outputContext.sendEvents(Collections.singletonList(compEvent));
     } catch (IOException e) {
-      LOG.error("Error in sending pipelined events", e);
+      LOG.error(destNameTrimmed + ": " + "Error in sending pipelined events", e);
       outputContext.fatalError(e, "Error in sending pipelined events");
     }
   }
@@ -925,7 +935,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
     @Override
     public void onSuccess(SpillResult result) {
-      LOG.info("Spill# " + spillNumber + " complete.");
       spilledSize += result.spillSize;
 
       sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition, spillNumber, false);
@@ -935,7 +944,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         availableBuffers.add(result.wrappedBuffer);
 
       } catch (Throwable e) {
-        LOG.error("Failure while attempting to reset buffer after spill", e);
+        LOG.error(destNameTrimmed + ": " + "Failure while attempting to reset buffer after spill", e);
         outputContext.fatalError(e, "Failure while attempting to reset buffer after spill");
       }
 
@@ -960,7 +969,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     public void onFailure(Throwable t) {
       // spillException setup to throw an exception back to the user. Requires synchronization.
       // Consider removing it in favor of having Tez kill the task
-      LOG.error("Failure while spilling to disk", t);
+      LOG.error(destNameTrimmed + ": " + "Failure while spilling to disk", t);
       spillException = t;
       outputContext.fatalError(t, "Failure while spilling to disk");
       spillLock.lock();

http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 7d887de..880dc2f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -132,8 +132,10 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
       List<Event> pending = new LinkedList<Event>();
       pendingEvents.drainTo(pending);
       if (pending.size() > 0) {
-        LOG.info("NoAutoStart delay in processing first event: "
-            + (System.currentTimeMillis() - firstEventReceivedTime));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("NoAutoStart delay in processing first event: "
+              + (System.currentTimeMillis() - firstEventReceivedTime));
+        }
         shuffle.handleEvents(pending);
       }
       isStarted.set(true);
@@ -283,10 +285,16 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
   protected synchronized void createValuesIterator()
       throws IOException {
     // Not used by ReduceProcessor
-    vIter = new ValuesIterator(rawIter,
-        (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf),
-        ConfigUtils.getIntermediateInputKeyClass(conf),
-        ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter);
+    RawComparator rawComparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
+    Class<?> keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+    Class<?> valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+    LOG.info(getContext().getSourceVertexName() + ": " + "creating ValuesIterator with "
+        + "comparator=" + rawComparator.getClass().getName()
+        + ", keyClass=" + keyClass.getName()
+        + ", valClass=" + valClass.getName());
+
+    vIter = new ValuesIterator(rawIter, rawComparator, keyClass, valClass,
+        conf, inputKeyCounter, inputValueCounter);
 
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 271eed3..fad164f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.runtime.library.common.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,7 +133,8 @@ public class UnorderedKVInput extends AbstractLogicalInput {
       ifileBufferSize = conf.getInt("io.file.buffer.size",
           TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
 
-      this.inputManager = new SimpleFetchedInputAllocator(getContext().getUniqueIdentifier(), conf,
+      this.inputManager = new SimpleFetchedInputAllocator(
+          TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()), getContext().getUniqueIdentifier(), conf,
           getContext().getTotalMemoryAvailableToTask(),
           memoryUpdateCallbackHandler.getMemoryAssigned());
 
@@ -150,8 +152,10 @@ public class UnorderedKVInput extends AbstractLogicalInput {
       List<Event> pending = new LinkedList<Event>();
       pendingEvents.drainTo(pending);
       if (pending.size() > 0) {
-        LOG.info("NoAutoStart delay in processing first event: "
-            + (System.currentTimeMillis() - firstEventReceivedTime));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(getContext().getSourceVertexName() + ": " + "NoAutoStart delay in processing first event: "
+              + (System.currentTimeMillis() - firstEventReceivedTime));
+        }
         inputEventHandler.handleEvents(pending);
       }
       isStarted.set(true);
@@ -208,6 +212,10 @@ public class UnorderedKVInput extends AbstractLogicalInput {
 
   @Override
   public synchronized List<Event> close() throws Exception {
+    if (this.inputEventHandler != null) {
+      this.inputEventHandler.logProgress(true);
+    }
+
     if (this.shuffleManager != null) {
       this.shuffleManager.shutdown();
     }
@@ -218,7 +226,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
     long inputRecords = getContext().getCounters()
         .findCounter(TaskCounter.INPUT_RECORDS_PROCESSED).getValue();
     getContext().getStatisticsReporter().reportItemsProcessed(inputRecords);
-    
+
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 98d75a9..45b6713 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -126,7 +126,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
 
       if (pipelinedShuffle) {
         if (finalMergeEnabled) {
-          LOG.info("Disabling final merge as "
+          LOG.info(getContext().getDestinationVertexName() + " disabling final merge as "
               + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " is enabled.");
           finalMergeEnabled = false;
           conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
@@ -184,8 +184,8 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
       this.endTime = System.nanoTime();
       returnEvents = generateEvents();
     } else {
-      LOG.warn(
-          "Attempting to close output {} of type {} before it was started. Generating empty events",
+      LOG.warn(getContext().getDestinationVertexName() +
+          ": Attempting to close output {} of type {} before it was started. Generating empty events",
           getContext().getDestinationVertexName(), this.getClass().getSimpleName());
       returnEvents = generateEmptyEvents();
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index b50f17d..879c2e0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -104,7 +104,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
       this.kvWriter = new UnorderedPartitionedKVWriter(getContext(), conf, 1,
           memoryUpdateCallbackHandler.getMemoryAssigned());
       isStarted.set(true);
-      LOG.info(this.getClass().getSimpleName() + " started. MemoryAssigned="
+      LOG.info(getContext().getDestinationVertexName() + " started. MemoryAssigned="
           + memoryUpdateCallbackHandler.getMemoryAssigned());
     }
   }
@@ -127,8 +127,8 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
       //TODO: Do we need to support sending payloads via events?
       returnEvents = kvWriter.close();
     } else {
-      LOG.warn(
-          "Attempting to close output {} of type {} before it was started. Generating empty events",
+      LOG.warn(getContext().getDestinationVertexName() +
+          ": Attempting to close output {} of type {} before it was started. Generating empty events",
           getContext().getDestinationVertexName(), this.getClass().getSimpleName());
       returnEvents = new LinkedList<Event>();
       ShuffleUtils

http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 7498627..90c0ed4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -104,8 +104,8 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
     if (isStarted.get()) {
       returnEvents = kvWriter.close();
     } else {
-      LOG.warn(
-          "Attempting to close output {} of type {} before it was started. Generating empty events",
+      LOG.warn(getContext().getDestinationVertexName() +
+          ": Attempting to close output {} of type {} before it was started. Generating empty events",
           getContext().getDestinationVertexName(), this.getClass().getSimpleName());
       returnEvents = new LinkedList<Event>();
       ShuffleUtils

http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
index ffa9429..2f89b0f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
@@ -52,7 +52,7 @@ public class TestSimpleFetchedInputAllocator {
     long inMemThreshold = (long) (bufferPercent * jvmMax);
     LOG.info("InMemThreshold: " + inMemThreshold);
 
-    SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator(UUID.randomUUID().toString(),
+    SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator("srcName", UUID.randomUUID().toString(),
         conf, Runtime.getRuntime().maxMemory(), inMemThreshold);
 
     long requestSize = (long) (0.4f * inMemThreshold);

http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index c22e605..b531464 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -214,19 +214,19 @@ public class TestDefaultSorter {
   public void testSortMBLimits() throws Exception {
 
     assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB,
-        DefaultSorter.computeSortBufferSize(4096) == DefaultSorter.MAX_IO_SORT_MB);
+        DefaultSorter.computeSortBufferSize(4096, "") == DefaultSorter.MAX_IO_SORT_MB);
     assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB,
-        DefaultSorter.computeSortBufferSize(2047) == DefaultSorter.MAX_IO_SORT_MB);
-    assertTrue("Expected 1024", DefaultSorter.computeSortBufferSize(1024) == 1024);
+        DefaultSorter.computeSortBufferSize(2047, "") == DefaultSorter.MAX_IO_SORT_MB);
+    assertTrue("Expected 1024", DefaultSorter.computeSortBufferSize(1024, "") == 1024);
 
     try {
-      DefaultSorter.computeSortBufferSize(0);
+      DefaultSorter.computeSortBufferSize(0, "");
       fail("Should have thrown error for setting buffer size to 0");
     } catch(RuntimeException re) {
     }
 
     try {
-      DefaultSorter.computeSortBufferSize(-100);
+      DefaultSorter.computeSortBufferSize(-100, "");
       fail("Should have thrown error for setting buffer size to negative value");
     } catch(RuntimeException re) {
     }


Mime
View raw message