tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [1/2] TEZ-175. Cleanup logs.
Date Mon, 03 Jun 2013 23:47:24 GMT
Updated Branches:
  refs/heads/TEZ-1 9f040cf8b -> 9feab053f


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 4c73559..a79358b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -55,12 +55,12 @@ public class AMNodeImpl implements AMNode {
   private int numSuccessfulTAs = 0;
   private boolean blacklistingEnabled;
   private boolean ignoreBlacklisting = false;
-  
+
   @SuppressWarnings("rawtypes")
   protected EventHandler eventHandler;
 
   private final List<ContainerId> containers = new LinkedList<ContainerId>();
-  
+
   //Book-keeping only. In case of Health status change.
   private final List<ContainerId> pastContainers = new LinkedList<ContainerId>();
 
@@ -69,8 +69,8 @@ public class AMNodeImpl implements AMNode {
   private final StateMachine<AMNodeState, AMNodeEventType, AMNodeEvent> stateMachine;
 
   private static StateMachineFactory
-  <AMNodeImpl, AMNodeState, AMNodeEventType, AMNodeEvent> 
-  stateMachineFactory = 
+  <AMNodeImpl, AMNodeState, AMNodeEventType, AMNodeEvent>
+  stateMachineFactory =
   new StateMachineFactory<AMNodeImpl, AMNodeState, AMNodeEventType, AMNodeEvent>(
   AMNodeState.ACTIVE)
         // Transitions from ACTIVE state.
@@ -97,7 +97,7 @@ public class AMNodeImpl implements AMNode {
     .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new NodeTurnedUnhealthyTransition())
     .addTransition(AMNodeState.FORCED_ACTIVE, EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE), AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingDisabledTransition())
     .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED), new GenericErrorTransition())
-            
+
         // Transitions from UNHEALTHY state.
     .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedWhileUnhealthyTransition())
     .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, EnumSet.of(AMNodeEventType.N_TA_SUCCEEDED, AMNodeEventType.N_TA_ENDED))
@@ -107,7 +107,7 @@ public class AMNodeImpl implements AMNode {
     .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new GenericErrorTransition())
 
         .installTopology();
-  
+
 
   @SuppressWarnings("rawtypes")
   public AMNodeImpl(NodeId nodeId, int maxTaskFailuresPerNode,
@@ -153,10 +153,12 @@ public class AMNodeImpl implements AMNode {
 
   @Override
   public void handle(AMNodeEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing AMNodeEvent " + event.getNodeId()
+          + " of type " + event.getType() + " while in state: " + getState()
+          + ". Event: " + event);
+    }
     this.writeLock.lock();
-    LOG.info("DEBUG: Processing AMNodeEvent " + event.getNodeId()
-        + " of type " + event.getType() + " while in state: " + getState()
-        + ". Event: " + event);
     try {
       final AMNodeState oldState = getState();
       try {
@@ -174,7 +176,7 @@ public class AMNodeImpl implements AMNode {
       writeLock.unlock();
     }
   }
-  
+
   protected boolean shouldBlacklistNode() {
     return blacklistingEnabled && (numFailedTAs >= maxTaskFailuresPerNode);
   }
@@ -193,7 +195,7 @@ public class AMNodeImpl implements AMNode {
   //////////////////////////////////////////////////////////////////////////////
   //                   Start of Transition Classes                            //
   //////////////////////////////////////////////////////////////////////////////
-  
+
   protected static class ContainerAllocatedTransition implements
       SingleArcTransition<AMNodeImpl, AMNodeEvent> {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java
index f683605..b3ebb8b 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java
@@ -35,62 +35,64 @@ import org.apache.tez.engine.common.sort.impl.IFile.Writer;
 @InterfaceStability.Unstable
 public class InMemoryWriter extends Writer {
   private static final Log LOG = LogFactory.getLog(InMemoryWriter.class);
-  
+
   private DataOutputStream out;
-  
+
   public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
     super(null);
-    this.out = 
+    this.out =
       new DataOutputStream(new IFileOutputStream(arrayStream));
   }
-  
+
   public void append(Object key, Object value) throws IOException {
     throw new UnsupportedOperationException
     ("InMemoryWriter.append(K key, V value");
   }
-  
+
   public void append(DataInputBuffer key, DataInputBuffer value)
   throws IOException {
     int keyLength = key.getLength() - key.getPosition();
     if (keyLength < 0) {
-      throw new IOException("Negative key-length not allowed: " + keyLength + 
+      throw new IOException("Negative key-length not allowed: " + keyLength +
                             " for " + key);
     }
-    
+
     boolean sameKey = (key == IFile.REPEAT_KEY);
-    
+
     int valueLength = value.getLength() - value.getPosition();
     if (valueLength < 0) {
-      throw new IOException("Negative value-length not allowed: " + 
+      throw new IOException("Negative value-length not allowed: " +
                             valueLength + " for " + value);
     }
-    
+
     if(sameKey) {
       WritableUtils.writeVInt(out, IFile.RLE_MARKER);
       WritableUtils.writeVInt(out, valueLength);
       out.write(value.getData(), value.getPosition(), valueLength);
     } else {
-      LOG.info("XXX InMemWriter.append" + 
-          " key.data=" + key.getData() + 
-          " key.pos=" + key.getPosition() + 
-          " key.len=" +key.getLength() + 
-          " val.data=" + value.getData() + 
-          " val.pos=" + value.getPosition() + 
-          " val.len=" + value.getLength());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("InMemWriter.append" +
+            " key.data=" + key.getData() +
+            " key.pos=" + key.getPosition() +
+            " key.len=" +key.getLength() +
+            " val.data=" + value.getData() +
+            " val.pos=" + value.getPosition() +
+            " val.len=" + value.getLength());
+      }
       WritableUtils.writeVInt(out, keyLength);
       WritableUtils.writeVInt(out, valueLength);
-      out.write(key.getData(), key.getPosition(), keyLength); 
-      out.write(value.getData(), value.getPosition(), valueLength);      
+      out.write(key.getData(), key.getPosition(), keyLength);
+      out.write(value.getData(), value.getPosition(), valueLength);
     }
-     
+
   }
 
   public void close() throws IOException {
     // Write EOF_MARKER for key/value length
     WritableUtils.writeVInt(out, IFile.EOF_MARKER);
     WritableUtils.writeVInt(out, IFile.EOF_MARKER);
-    
-    // Close the stream 
+
+    // Close the stream
     out.close();
     out = null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
index 76fc2e9..f5976e3 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
@@ -86,12 +86,12 @@ public abstract class ExternalSorter {
   protected SerializationFactory serializationFactory;
   protected Serializer keySerializer;
   protected Serializer valSerializer;
-  
+
   protected IndexedSorter sorter;
 
   // Compression for map-outputs
   protected CompressionCodec codec;
-  
+
   // Counters
   protected TezCounter mapOutputByteCounter;
   protected TezCounter mapOutputRecordCounter;
@@ -101,41 +101,41 @@ public abstract class ExternalSorter {
 
   public void initialize(Configuration conf, Master master)
       throws IOException, InterruptedException {
-    
+
     this.job = conf;
-    LOG.info("TEZ_ENGINE_TASK_ATTEMPT_ID: " + 
+    LOG.info("TEZ_ENGINE_TASK_ATTEMPT_ID: " +
         job.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
 
     partitions = task.getOutputSpecList().get(0).getNumOutputs();
-//    partitions = 
+//    partitions =
 //        job.getInt(
-//            TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE, 
+//            TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE,
 //            TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_OUTDEGREE);
     rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
-    
+
     // sorter
     sorter = ReflectionUtils.newInstance(job.getClass(
         TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS, QuickSort.class,
         IndexedSorter.class), job);
-    
+
     comparator = ConfigUtils.getIntermediateOutputKeyComparator(job);
-    
+
     // k/v serialization
     keyClass = ConfigUtils.getIntermediateOutputKeyClass(job);
     valClass = ConfigUtils.getIntermediateOutputValueClass(job);
     serializationFactory = new SerializationFactory(job);
     keySerializer = serializationFactory.getSerializer(keyClass);
     valSerializer = serializationFactory.getSerializer(valClass);
-    
+
     //    counters
-    mapOutputByteCounter = 
+    mapOutputByteCounter =
         runningTaskContext.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_BYTES);
     mapOutputRecordCounter =
       runningTaskContext.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
-    fileOutputByteCounter = 
+    fileOutputByteCounter =
         runningTaskContext.getTaskReporter().
             getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
-    spilledRecordsCounter = 
+    spilledRecordsCounter =
         runningTaskContext.getTaskReporter().getCounter(TaskCounter.SPILLED_RECORDS);
     // compression
     if (ConfigUtils.shouldCompressIntermediateOutput(job)) {
@@ -146,14 +146,13 @@ public abstract class ExternalSorter {
       codec = null;
     }
 
-    // Task outputs 
+    // Task outputs
     mapOutputFile =
         (TezTaskOutput) ReflectionUtils.newInstance(
             conf.getClass(
-                Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER, 
+                Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
                 TezTaskOutputFiles.class), conf);
-//    LOG.info("XXX mapOutputFile: " + mapOutputFile.getClass());
-    
+
     // sortPhase
     sortPhase  = runningTaskContext.getProgress().addPhase("sort", 0.333f);
   }
@@ -227,13 +226,8 @@ public abstract class ExternalSorter {
     if (!src.renameTo(dst)) {
       throw new IOException("Unable to rename " + src + " to " + dst);
     }
-//    LOG.info("XXX sameVolRename src=" + src + ", dst=" + dst);
   }
 
-//  public ExternalSorter() {
-//    super();
-//  }
-  
   public ExternalSorter(TezEngineTaskContext tezEngineTask) {
     this.task = tezEngineTask;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
index 75fcd68..00b8958 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
@@ -36,11 +36,9 @@ import org.apache.hadoop.util.DataChecksum;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class IFileOutputStream extends FilterOutputStream {
-  
-  private static final Log LOG = LogFactory.getLog(IFileOutputStream.class);
-  
+
   /**
-   * The output stream to be checksummed. 
+   * The output stream to be checksummed.
    */
   private final DataChecksum sum;
   private byte[] barray;
@@ -62,7 +60,7 @@ public class IFileOutputStream extends FilterOutputStream {
     buffer = new byte[4096];
     offset = 0;
   }
-  
+
   @Override
   public void close() throws IOException {
     if (closed) {
@@ -105,8 +103,8 @@ public class IFileOutputStream extends FilterOutputStream {
     // FIXME if needed re-enable this in debug mode
     if (LOG.isDebugEnabled()) {
       LOG.debug("XXX checksum" +
-          " b=" + b + " off=" + off + 
-          " buffer=" + " offset=" + offset + 
+          " b=" + b + " off=" + off +
+          " buffer=" + " offset=" + offset +
           " len=" + len);
     }
     */
@@ -123,7 +121,7 @@ public class IFileOutputStream extends FilterOutputStream {
     checksum(b, off, len);
     out.write(b,off,len);
   }
- 
+
   @Override
   public void write(int b) throws IOException {
     barray[0] = (byte) (b & 0xFF);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
index e567492..adbff22 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
@@ -57,16 +57,16 @@ import org.apache.tez.engine.records.OutputContext;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class DefaultSorter extends ExternalSorter implements IndexedSortable {
-  
+
   private static final Log LOG = LogFactory.getLog(DefaultSorter.class);
-  
+
   /**
    * The size of each record in the index file for the map-outputs.
    */
   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
 
   private final static int APPROX_HEADER_LENGTH = 150;
-  
+
   // k/v accounting
   IntBuffer kvmeta; // metadata overlay on backing store
   int kvstart;            // marks origin of spill metadata
@@ -118,15 +118,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   }
 
   @Override
-  public void initialize(Configuration conf, Master master) 
+  public void initialize(Configuration conf, Master master)
       throws IOException, InterruptedException {
     if (task == null) {
       LOG.info("Bailing!", new IOException());
       return;
     }
-    
+
     super.initialize(conf, master);
-    
+
     // sanity checks
     final float spillper = job.getFloat(
         TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT,
@@ -144,7 +144,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
 
     indexCacheMemoryLimit = job.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
                                        TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
-    
+
     // buffers and accounting
     int maxMemUsage = sortmb << 20;
     maxMemUsage -= maxMemUsage % METASIZE;
@@ -169,8 +169,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
 
     // k/v serialization
     valSerializer.open(bb);
-    keySerializer.open(bb);   
-    
+    keySerializer.open(bb);
+
     spillInProgress = false;
     minSpillsForCombine = job.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
     spillThread.setDaemon(true);
@@ -193,7 +193,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   }
 
   @Override
-  public void write(Object key, Object value) 
+  public void write(Object key, Object value)
       throws IOException, InterruptedException {
     collect(
         key, value, partitioner.getPartition(key, value, partitions));
@@ -673,7 +673,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
             spillLock.unlock();
             sortAndSpill();
           } catch (Throwable t) {
-            LOG.warn("ZZZZ: Got an exception in sortAndSpill", t);
+            LOG.warn("Got an exception in sortAndSpill", t);
             sortSpillException = t;
           } finally {
             spillLock.lock();
@@ -727,23 +727,23 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   int getMetaStart() {
     return kvend / NMETA;
   }
-  
+
   int getMetaEnd() {
     return 1 + // kvend is a valid record
         (kvstart >= kvend
         ? kvstart
         : kvmeta.capacity() + kvstart) / NMETA;
   }
-  
-  protected void sortAndSpill() 
+
+  protected void sortAndSpill()
       throws IOException, InterruptedException {
     final int mstart = getMetaStart();
     final int mend = getMetaEnd();
     sorter.sort(this, mstart, mend, runningTaskContext.getTaskReporter());
-    spill(mstart, mend); 
+    spill(mstart, mend);
   }
-  
-  protected void spill(int mstart, int mend) 
+
+  protected void spill(int mstart, int mend)
       throws IOException, InterruptedException {
 
     //approximate the length of the output file to be the length of the
@@ -775,7 +775,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
                 kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
               final int kvoff = offsetFor(spindex);
               key.reset(
-                  kvbuffer, 
+                  kvbuffer,
                   kvmeta.get(kvoff + KEYSTART),
                   (kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART))
                   );
@@ -795,7 +795,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
             if (spstart != spindex) {
               TezRawKeyValueIterator kvIter =
                 new MRResultIterator(spstart, spindex);
-              LOG.info("DEBUG: Running combine processor");
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Running combine processor");
+              }
               runCombineProcessor(kvIter, writer);
             }
           }
@@ -804,10 +806,10 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           writer.close();
 
           // record offsets
-          final TezIndexRecord rec = 
+          final TezIndexRecord rec =
               new TezIndexRecord(
-                  segmentStart, 
-                  writer.getRawLength(), 
+                  segmentStart,
+                  writer.getRawLength(),
                   writer.getCompressedLength());
           spillRec.putIndex(rec, i);
 
@@ -870,10 +872,10 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           writer.close();
 
           // record offsets
-          TezIndexRecord rec = 
+          TezIndexRecord rec =
               new TezIndexRecord(
-                  segmentStart, 
-                  writer.getRawLength(), 
+                  segmentStart,
+                  writer.getRawLength(),
                   writer.getCompressedLength());
           spillRec.putIndex(rec, i);
 
@@ -913,7 +915,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
       return vallen;
   }
-  
+
   /**
    * Given an offset, populate vbytes with the associated set of
    * deserialized value bytes. Should only be called during a spill.
@@ -936,7 +938,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     public InMemValBytes(int bufvoid) {
       this.bufvoid = bufvoid;
     }
-    
+
     public void reset(byte[] buffer, int start, int length) {
       this.buffer = buffer;
       this.start = start;
@@ -957,7 +959,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   InMemValBytes createInMemValBytes() {
     return new InMemValBytes(bufvoid);
   }
-  
+
   protected class MRResultIterator implements TezRawKeyValueIterator {
     private final DataInputBuffer keybuf = new DataInputBuffer();
     private final InMemValBytes vbytes = createInMemValBytes();
@@ -1031,7 +1033,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
 
     if (numSpills == 0) {
       //create dummy files
-      
+
       TezSpillRecord sr = new TezSpillRecord(partitions);
       try {
         for (int i = 0; i < partitions; i++) {
@@ -1039,11 +1041,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           Writer writer =
             new Writer(job, finalOut, keyClass, valClass, codec, null);
           writer.close();
-          
-          TezIndexRecord rec = 
+
+          TezIndexRecord rec =
               new TezIndexRecord(
-                  segmentStart, 
-                  writer.getRawLength(), 
+                  segmentStart,
+                  writer.getRawLength(),
                   writer.getCompressedLength());
           sr.putIndex(rec, i);
         }
@@ -1057,7 +1059,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     else {
       sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
       TezMerger.considerFinalMergeForProgress();
-      
+
       final TezSpillRecord spillRec = new TezSpillRecord(partitions);
       for (int parts = 0; parts < partitions; parts++) {
         //create the segments to be merged
@@ -1074,13 +1076,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           if (LOG.isDebugEnabled()) {
             LOG.debug("MapId=" + mapId + " Reducer=" + parts +
                 "Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
-                indexRecord.getRawLength() + ", " + 
+                indexRecord.getRawLength() + ", " +
                 indexRecord.getPartLength() + ")");
           }
         }
 
-        int mergeFactor = 
-            job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 
+        int mergeFactor =
+            job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
                 TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
         // sort the segments only if there are intermediate merges
         boolean sortSegments = segmentList.size() > mergeFactor;
@@ -1089,9 +1091,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
                        keyClass, valClass, codec,
                        segmentList, mergeFactor,
                        new Path(mapId.toString()),
-                       (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(job), 
+                       (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(job),
                        runningTaskContext.getTaskReporter(), sortSegments,
-                       null, spilledRecordsCounter, 
+                       null, spilledRecordsCounter,
                        sortPhase.phase());
 
         //write merged output to disk
@@ -1109,10 +1111,10 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
 
         sortPhase.startNextPhase();
         // record offsets
-        final TezIndexRecord rec = 
+        final TezIndexRecord rec =
             new TezIndexRecord(
-                segmentStart, 
-                writer.getRawLength(), 
+                segmentStart,
+                writer.getRawLength(),
                 writer.getCompressedLength());
         spillRec.putIndex(rec, parts);
       }
@@ -1123,7 +1125,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       }
     }
   }
-  
+
   @Override
   public OutputContext getOutputContext() {
     return null;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
index 5fb6519..b7874f0 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
@@ -55,20 +55,22 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   // assume configured to $localdir/usercache/$user/appcache/$appId
-  private LocalDirAllocator lDirAlloc = 
+  private LocalDirAllocator lDirAlloc =
     new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
 
   private Path getAttemptOutputDir() {
-    LOG.info("DEBUG: getAttemptOutputDir: "
-        + Constants.TASK_OUTPUT_DIR + "/"
-        + conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getAttemptOutputDir: "
+          + Constants.TASK_OUTPUT_DIR + "/"
+          + conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+    }
     return new Path(Constants.TASK_OUTPUT_DIR,
         conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
   }
-  
+
   /**
    * Return the path to local map output file created earlier
-   * 
+   *
    * @return path
    * @throws IOException
    */
@@ -80,13 +82,13 @@ public class TezTaskOutputFiles extends TezTaskOutput {
 
   /**
    * Create a local map output file name.
-   * 
+   *
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
   public Path getOutputFileForWrite(long size) throws IOException {
-    Path attemptOutput = 
+    Path attemptOutput =
       new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
     return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
   }
@@ -103,7 +105,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
 
   /**
    * Return the path to a local map output index file created earlier
-   * 
+   *
    * @return path
    * @throws IOException
    */
@@ -116,7 +118,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
 
   /**
    * Create a local map output index file name.
-   * 
+   *
    * @param size the size of the file
    * @return path
    * @throws IOException
@@ -142,7 +144,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
 
   /**
    * Return a local map spill file created earlier.
-   * 
+   *
    * @param spillNumber the number
    * @return path
    * @throws IOException
@@ -155,7 +157,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
 
   /**
    * Create a local map spill file name.
-   * 
+   *
    * @param spillNumber the number
    * @param size the size of the file
    * @return path
@@ -170,7 +172,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
 
   /**
    * Return a local map spill index file created earlier
-   * 
+   *
    * @param spillNumber the number
    * @return path
    * @throws IOException
@@ -183,7 +185,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
 
   /**
    * Create a local map spill index file name.
-   * 
+   *
    * @param spillNumber the number
    * @param size the size of the file
    * @return path
@@ -198,10 +200,10 @@ public class TezTaskOutputFiles extends TezTaskOutput {
 
   /**
    * Return a local reduce input file created earlier
-   * 
+   *
    * @param mapId a map task id
    * @return path
-   * @throws IOException 
+   * @throws IOException
    */
   public Path getInputFile(int mapId) throws IOException {
     throw new UnsupportedOperationException("Incompatible with LocalRunner");
@@ -209,7 +211,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
 
   /**
    * Create a local reduce input file name.
-   * 
+   *
    * @param mapId a map task id
    * @param size the size of the file
    * @return path
@@ -235,5 +237,5 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   public Configuration getConf() {
     return conf;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
index 1d2360b..a954f6e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
@@ -30,28 +30,28 @@ import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 public class LocalOnFileSorterOutput extends OnFileSortedOutput {
 
   private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class);
-  
+
   public LocalOnFileSorterOutput(TezEngineTaskContext task) throws IOException {
     super(task);
   }
 
   @Override
   public void close() throws IOException, InterruptedException {
-    LOG.info("XXX close");
-
+    LOG.debug("Closing LocalOnFileSorterOutput");
     super.close();
 
-
     TezTaskOutput mapOutputFile = sorter.getMapOutput();
     FileSystem localFs = FileSystem.getLocal(mapOutputFile.getConf());
 
     Path src = mapOutputFile.getOutputFile();
-    Path dst = 
+    Path dst =
         mapOutputFile.getInputFileForWrite(
             sorter.getTaskAttemptId().getTaskID(),
             localFs.getFileStatus(src).getLen());
 
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming src = " + src + ", dst = " + dst);
+    }
     localFs.rename(src, dst);
-    LOG.info("XXX renaming src = " + src + ", dst = " + dst);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
index 900c2f0..1946a62 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
@@ -62,7 +62,7 @@ public class RuntimeUtils {
 
   public static RuntimeTask createRuntimeTask(
       TezEngineTaskContext taskContext) {
-    LOG.info("TaskContext"
+    LOG.info("Creating a runtime task from TaskContext"
         + ", Processor: " + taskContext.getProcessorName()
         + ", InputCount=" + taskContext.getInputSpecList().size()
         + ", OutputCount=" + taskContext.getOutputSpecList().size());
@@ -78,38 +78,45 @@ public class RuntimeUtils {
       Input[] inputs;
       Output[] outputs;
       if (taskContext.getInputSpecList().isEmpty()) {
-        LOG.info("Initializing task with 0 inputs");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Initializing task with 0 inputs");
+        }
         inputs = new Input[0];
       } else {
         int iSpecCount = taskContext.getInputSpecList().size();
         inputs = new Input[iSpecCount];
         for (int i = 0; i < iSpecCount; ++i) {
           InputSpec inSpec = taskContext.getInputSpecList().get(i);
-          LOG.info("XXXX Using Input"
-              + ", index=" + i
-              + ", inputClass=" + inSpec.getInputClassName());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Using Input"
+                + ", index=" + i
+                + ", inputClass=" + inSpec.getInputClassName());
+          }
           Class<?> inputClazz = Class.forName(inSpec.getInputClassName());
           Input input = (Input) getNewInstance(inputClazz, taskContext);
           inputs[i] = input;
         }
       }
       if (taskContext.getOutputSpecList().isEmpty()) {
-        LOG.info("Initializing task with 0 outputs");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Initializing task with 0 outputs");
+        }
         outputs = new Output[0];
       } else {
         int oSpecCount = taskContext.getOutputSpecList().size();
         outputs = new Output[oSpecCount];
         for (int i = 0; i < oSpecCount; ++i) {
           OutputSpec outSpec = taskContext.getOutputSpecList().get(i);
-          LOG.info("XXXX Using Output"
-              + ", index=" + i
-              + ", output=" + outSpec.getOutputClassName());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Using Output"
+                + ", index=" + i
+                + ", output=" + outSpec.getOutputClassName());
+          }
           Class<?> outputClazz = Class.forName(outSpec.getOutputClassName());
           Output output = (Output) getNewInstance(outputClazz, taskContext);
           outputs[i] = output;
         }
       }
-      // t = new RuntimeTask(taskContext, processor, inputs, outputs);
       t = createRuntime(taskContext, processor, inputs, outputs);
     } catch (ClassNotFoundException e) {
       throw new YarnException("Unable to initialize RuntimeTask, context="
@@ -117,7 +124,7 @@ public class RuntimeUtils {
     }
     return t;
   }
-  
+
   private static RuntimeTask createRuntime(TezEngineTaskContext taskContext,
       Processor processor, Input[] inputs, Output[] outputs) {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 5dade2b..e930044 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -106,7 +106,7 @@ public class LocalJobRunner implements ClientProtocol {
   private AtomicInteger map_tasks = new AtomicInteger(0);
   private int reduce_tasks = 0;
   final Random rand = new Random();
-  
+
   private LocalJobRunnerMetrics myMetrics = null;
 
   private static final String jobDir =  "localRunner/";
@@ -129,7 +129,7 @@ public class LocalJobRunner implements ClientProtocol {
     // This is analogous to JobTracker's system directory.
     private Path systemJobDir;
     private Path systemJobFile;
-    
+
     // The job directory for the task.  Analagous to a task's job directory.
     private Path localJobDir;
     private Path localJobFile;
@@ -149,13 +149,13 @@ public class LocalJobRunner implements ClientProtocol {
     private JobProfile profile;
     private FileSystem localFs;
     boolean killed = false;
-    
+
     private LocalDistributedCacheManager localDistributedCacheManager;
 
     public long getProtocolVersion(String protocol, long clientVersion) {
       return TaskUmbilicalProtocol.versionID;
     }
-    
+
     @Override
     public ProtocolSignature getProtocolSignature(String protocol,
         long clientVersion, int clientMethodsHash) throws IOException {
@@ -176,7 +176,7 @@ public class LocalJobRunner implements ClientProtocol {
       // this will trigger localFile to be re-written again.
       localDistributedCacheManager = new LocalDistributedCacheManager();
       localDistributedCacheManager.setup(conf);
-      
+
       // Write out configuration file.  Instead of copying it from
       // systemJobFile, we re-write it, since setup(), above, may have
       // updated it.
@@ -193,11 +193,11 @@ public class LocalJobRunner implements ClientProtocol {
         setContextClassLoader(localDistributedCacheManager.makeClassLoader(
                 getContextClassLoader()));
       }
-      
-      profile = new JobProfile(job.getUser(), id, systemJobFile.toString(), 
+
+      profile = new JobProfile(job.getUser(), id, systemJobFile.toString(),
                                "http://localhost:8080/", job.getJobName());
-      status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING, 
-          profile.getUser(), profile.getJobName(), profile.getJobFile(), 
+      status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING,
+          profile.getUser(), profile.getJobName(), profile.getJobFile(),
           profile.getURL().toString());
 
       jobs.put(id, this);
@@ -235,7 +235,7 @@ public class LocalJobRunner implements ClientProtocol {
           TaskAttemptID mapId = new TaskAttemptID(new TaskID(
               jobId, TaskType.MAP, taskId), 0);
           LOG.info("Starting task: " + mapId);
-          final String user = 
+          final String user =
               UserGroupInformation.getCurrentUser().getShortUserName();
           setupChildMapredLocalDirs(mapId, user, localConf);
           localConf.setUser(user);
@@ -244,7 +244,7 @@ public class LocalJobRunner implements ClientProtocol {
               IDConverter.fromMRTaskAttemptId(mapId);
           mapIds.add(mapId);
           // FIXME invalid task context
-          TezEngineTaskContext taskContext = 
+          TezEngineTaskContext taskContext =
               new TezEngineTaskContext(
                   tezMapId, user, localConf.getJobName(), "TODO_vertexName",
                   MapProcessor.class.getName(),
@@ -346,7 +346,7 @@ public class LocalJobRunner implements ClientProtocol {
       return executor;
     }
 
-    private org.apache.hadoop.mapreduce.OutputCommitter 
+    private org.apache.hadoop.mapreduce.OutputCommitter
     createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
       org.apache.hadoop.mapreduce.OutputCommitter committer = null;
 
@@ -358,7 +358,7 @@ public class LocalJobRunner implements ClientProtocol {
             new org.apache.hadoop.mapreduce.TaskID(jobId, TaskType.MAP, 0);
         org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID =
             new org.apache.hadoop.mapreduce.TaskAttemptID(taskId, 0);
-        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = 
+        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
             new TaskAttemptContextImpl(conf, taskAttemptID);
         @SuppressWarnings("rawtypes")
         OutputFormat outputFormat =
@@ -377,7 +377,7 @@ public class LocalJobRunner implements ClientProtocol {
     public void run() {
       JobID jobId = profile.getJobID();
       JobContext jContext = new JobContextImpl(job, jobId);
-      
+
       org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
       try {
         outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
@@ -385,9 +385,9 @@ public class LocalJobRunner implements ClientProtocol {
         LOG.info("Failed to createOutputCommitter", e);
         return;
       }
-      
+
       try {
-        TaskSplitMetaInfo[] taskSplitMetaInfos = 
+        TaskSplitMetaInfo[] taskSplitMetaInfos =
           SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
 
         int numReduceTasks = job.getNumReduceTasks();
@@ -440,7 +440,7 @@ public class LocalJobRunner implements ClientProtocol {
         LOG.info("Starting task: " + reduceId);
         try {
           if (numReduceTasks > 0) {
-            String user = 
+            String user =
                 UserGroupInformation.getCurrentUser().getShortUserName();
             JobConf localConf = new JobConf(job);
             localConf.setUser(user);
@@ -457,14 +457,16 @@ public class LocalJobRunner implements ClientProtocol {
                 Collections.singletonList(new OutputSpec("TODO_targetVertex",
                     0, SimpleOutput.class.getName())));
 
-            // move map output to reduce input  
+            // move map output to reduce input
             for (int i = 0; i < mapIds.size(); i++) {
               if (!this.isInterrupted()) {
                 TaskAttemptID mapId = mapIds.get(i);
-                LOG.info("XXX mapId: " + i + 
-                    " LOCAL_DIR = " + 
-                    mapOutputFiles.get(mapId).getConf().get(
-                        TezJobConfig.LOCAL_DIRS));
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("XXX mapId: " + i +
+                      " LOCAL_DIR = " +
+                      mapOutputFiles.get(mapId).getConf().get(
+                          TezJobConfig.LOCAL_DIRS));
+                }
                 Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
                 TezTaskOutput localOutputFile = new TezLocalTaskOutputFiles();
                 localOutputFile.setConf(localConf);
@@ -514,7 +516,7 @@ public class LocalJobRunner implements ClientProtocol {
 
       } catch (Throwable t) {
         try {
-          outputCommitter.abortJob(jContext, 
+          outputCommitter.abortJob(jContext,
             org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
         } catch (IOException ioe) {
           LOG.info("Error cleaning up job:" + id);
@@ -547,7 +549,7 @@ public class LocalJobRunner implements ClientProtocol {
         throws IOException {
       return null;
     }
-    
+
     @Override
     public synchronized boolean statusUpdate(TezTaskAttemptID taskId,
         TezTaskStatus taskStatus) throws IOException, InterruptedException {
@@ -596,7 +598,7 @@ public class LocalJobRunner implements ClientProtocol {
      */
     @Override
     public void commitPending(TezTaskAttemptID taskid,
-                              TezTaskStatus taskStatus) 
+                              TezTaskStatus taskStatus)
     throws IOException, InterruptedException {
       statusUpdate(taskid, taskStatus);
     }
@@ -605,17 +607,17 @@ public class LocalJobRunner implements ClientProtocol {
     public void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace) {
       // Ignore for now
     }
-    
+
     @Override
     public boolean ping(TezTaskAttemptID taskid) throws IOException {
       return true;
     }
-    
+
     @Override
     public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
       return true;
     }
-    
+
     @Override
     public void done(TezTaskAttemptID taskId) throws IOException {
       int taskIndex = mapIds.indexOf(taskId);
@@ -627,25 +629,25 @@ public class LocalJobRunner implements ClientProtocol {
     }
 
     @Override
-    public synchronized void fsError(TezTaskAttemptID taskId, String message) 
+    public synchronized void fsError(TezTaskAttemptID taskId, String message)
     throws IOException {
       LOG.fatal("FSError: "+ message + "from task: " + taskId);
     }
 
     @Override
-    public void shuffleError(TezTaskAttemptID taskId, String message) 
+    public void shuffleError(TezTaskAttemptID taskId, String message)
         throws IOException {
       LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
     }
-    
+
     @Override
-    public synchronized void fatalError(TezTaskAttemptID taskId, String msg) 
+    public synchronized void fatalError(TezTaskAttemptID taskId, String msg)
     throws IOException {
       LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
     }
-    
+
     @Override
-    public TezTaskDependencyCompletionEventsUpdate 
+    public TezTaskDependencyCompletionEventsUpdate
     getDependentTasksCompletionEvents(
         int fromEventIdx, int maxEventsToFetch,
         TezTaskAttemptID reduce) {
@@ -704,7 +706,7 @@ public class LocalJobRunner implements ClientProtocol {
     throw new UnsupportedOperationException("Changing job priority " +
                       "in LocalJobRunner is not supported.");
   }
-  
+
   /** Throws {@link UnsupportedOperationException} */
   public boolean killTask(org.apache.hadoop.mapreduce.TaskAttemptID taskId,
       boolean shouldFail) throws IOException {
@@ -722,10 +724,10 @@ public class LocalJobRunner implements ClientProtocol {
     Job job = jobs.get(JobID.downgrade(id));
     if(job != null)
       return job.status;
-    else 
+    else
       return null;
   }
-  
+
   public org.apache.hadoop.mapreduce.Counters getJobCounters(
       org.apache.hadoop.mapreduce.JobID id) {
     Job job = jobs.get(JobID.downgrade(id));
@@ -737,7 +739,7 @@ public class LocalJobRunner implements ClientProtocol {
   public String getFilesystemName() throws IOException {
     return fs.getUri().toString();
   }
-  
+
   public ClusterMetrics getClusterMetrics() {
     int numMapTasks = map_tasks.get();
     return new ClusterMetrics(numMapTasks, reduce_tasks, numMapTasks,
@@ -748,25 +750,25 @@ public class LocalJobRunner implements ClientProtocol {
     return JobTrackerStatus.RUNNING;
   }
 
-  public long getTaskTrackerExpiryInterval() 
+  public long getTaskTrackerExpiryInterval()
       throws IOException, InterruptedException {
     return 0;
   }
 
-  /** 
-   * Get all active trackers in cluster. 
+  /**
+   * Get all active trackers in cluster.
    * @return array of TaskTrackerInfo
    */
-  public TaskTrackerInfo[] getActiveTrackers() 
+  public TaskTrackerInfo[] getActiveTrackers()
       throws IOException, InterruptedException {
     return null;
   }
 
-  /** 
-   * Get all blacklisted trackers in cluster. 
+  /**
+   * Get all blacklisted trackers in cluster.
    * @return array of TaskTrackerInfo
    */
-  public TaskTrackerInfo[] getBlacklistedTrackers() 
+  public TaskTrackerInfo[] getBlacklistedTrackers()
       throws IOException, InterruptedException {
     return null;
   }
@@ -776,10 +778,10 @@ public class LocalJobRunner implements ClientProtocol {
       , int fromEventId, int maxEvents) throws IOException {
     return TaskCompletionEvent.EMPTY_ARRAY;
   }
-  
+
   public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() {return null;}
 
-  
+
   /**
    * Returns the diagnostic information for a particular task in the given job.
    * To be implemented
@@ -794,7 +796,7 @@ public class LocalJobRunner implements ClientProtocol {
    */
   public String getSystemDir() {
     Path sysDir = new Path(
-      conf.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system"));  
+      conf.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system"));
     return fs.makeQualified(sysDir).toString();
   }
 
@@ -809,7 +811,7 @@ public class LocalJobRunner implements ClientProtocol {
    * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
    */
   public String getStagingAreaDir() throws IOException {
-    Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, 
+    Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT,
         "/tmp/hadoop/mapred/staging"));
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     String user;
@@ -820,7 +822,7 @@ public class LocalJobRunner implements ClientProtocol {
     }
     return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
   }
-  
+
   public String getJobHistoryDir() {
     return null;
   }
@@ -847,7 +849,7 @@ public class LocalJobRunner implements ClientProtocol {
   }
 
   @Override
-  public org.apache.hadoop.mapreduce.QueueAclsInfo[] 
+  public org.apache.hadoop.mapreduce.QueueAclsInfo[]
       getQueueAclsForCurrentUser() throws IOException{
     return null;
   }
@@ -879,7 +881,7 @@ public class LocalJobRunner implements ClientProtocol {
   }
 
   @Override
-  public Token<DelegationTokenIdentifier> 
+  public Token<DelegationTokenIdentifier>
      getDelegationToken(Text renewer) throws IOException, InterruptedException {
     return null;
   }
@@ -896,10 +898,10 @@ public class LocalJobRunner implements ClientProtocol {
       throws IOException, InterruptedException {
     throw new UnsupportedOperationException("Not supported");
   }
-  
+
   static void setupChildMapredLocalDirs(
       TaskAttemptID taskAttemptID, String user, JobConf conf) {
-    String[] localDirs = 
+    String[] localDirs =
         conf.getTrimmedStrings(
             TezJobConfig.LOCAL_DIRS, TezJobConfig.DEFAULT_LOCAL_DIRS);
     String jobId = taskAttemptID.getJobID().toString();
@@ -912,17 +914,17 @@ public class LocalJobRunner implements ClientProtocol {
       childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
           + getLocalTaskDir(user, jobId, taskId, isCleanup));
     }
-    LOG.info(TezJobConfig.LOCAL_DIRS + " for child : " + taskAttemptID + 
+    LOG.info(TezJobConfig.LOCAL_DIRS + " for child : " + taskAttemptID +
         " is " + childMapredLocalDir);
     conf.set(TezJobConfig.LOCAL_DIRS, childMapredLocalDir.toString());
-    conf.setClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER, 
+    conf.setClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
         TezLocalTaskOutputFiles.class, TezTaskOutput.class);
   }
-  
+
   static final String TASK_CLEANUP_SUFFIX = ".cleanup";
   static final String SUBDIR = jobDir;
   static final String JOBCACHE = "jobcache";
-  
+
   static String getLocalTaskDir(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
     String taskDir = SUBDIR + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE
@@ -932,6 +934,6 @@ public class LocalJobRunner implements ClientProtocol {
     }
     return taskDir;
   }
-  
-  
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
index 72246c0..0fcfe65 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
@@ -61,12 +61,14 @@ public class SplitMetaInfoReaderTez {
         conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR),
         MRJobConfig.JOB_SPLIT_METAINFO);
     String jobSplitFile = MRJobConfig.JOB_SPLIT;
-    
+
     File file = new File(metaSplitFile.toUri().getPath()).getAbsoluteFile();
-    LOG.info("DEBUG: Setting up JobSplitIndex with JobSplitFile at: "
-        + file.getAbsolutePath() + ", defaultFS from conf: "
-        + FileSystem.getDefaultUri(conf));
-    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Setting up JobSplitIndex with JobSplitFile at: "
+          + file.getAbsolutePath() + ", defaultFS from conf: "
+          + FileSystem.getDefaultUri(conf));
+    }
+
     FileStatus fStatus = fs.getFileStatus(metaSplitFile);
     if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
       throw new IOException("Split metadata size exceeded " + maxMetaInfoSize

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
index b8a99ee..a3880f4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
@@ -83,8 +83,9 @@ public class MRCombiner implements Processor {
   @Override
   public void process(Input[] in, Output[] out) throws IOException,
       InterruptedException {
-    LOG.info("DEBUG: Running MRCombiner"
-        + ", usingNewAPI=" + useNewApi);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Running MRCombiner, usingNewAPI=" + useNewApi);
+    }
 
     CombineInput input = (CombineInput)in[0];
     CombineOutput output = (CombineOutput)out[0];
@@ -334,8 +335,6 @@ public class MRCombiner implements Processor {
           comparator,
           keyClass,
           valueClass);
-    LOG.info("DEBUG: Using combineKeyClass: "
-          + keyClass + ", combineValueClass: " + valueClass);
 
     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
         reducerContext = new

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
index 9f1ae88..54648f5 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
@@ -31,7 +31,7 @@ import org.apache.tez.mapreduce.hadoop.DeprecatedKeys.MultiStageKeys;
 public class MultiStageMRConfToTezTranslator {
 
   private static final Log LOG = LogFactory.getLog(MultiStageMRConfToTezTranslator.class);
-  
+
   private enum DeprecationReason {
     DEPRECATED_DIRECT_TRANSLATION, DEPRECATED_MULTI_STAGE
   }
@@ -55,14 +55,14 @@ public class MultiStageMRConfToTezTranslator {
     int numEdges = totalStages - 1;
 
     Configuration[] allConfs = extractStageConfs(newConf, numEdges);
-    
+
     for (int i = 0; i < allConfs.length; i++) {
       setStageKeysFromBaseConf(allConfs[i], srcConf, Integer.toString(i));
       processDirectConversion(allConfs[i]);
     }
     for (int i = 0; i < allConfs.length - 1; i++) {
       processMultiStageDepreaction(allConfs[i], allConfs[i + 1]);
-      
+
     }
     // Unset unnecessary keys in the last stage. Will end up being called for
     // single stage as well which should be harmless.
@@ -163,29 +163,35 @@ public class MultiStageMRConfToTezTranslator {
     JobConf jobConf = new JobConf(baseConf);
     // Don't clobber explicit tez config.
     if (conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS) == null
-        && conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS) == null) {
+        && conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS)
+        == null) {
       // If this is set, but the comparator is not set, and their types differ -
       // the job will break.
       if (conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS) == null) {
         // Pull tis in from the baseConf
         conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, jobConf
             .getMapOutputKeyClass().getName());
-        LOG.info("XXX: Setting " + MRJobConfig.MAP_OUTPUT_KEY_CLASS
-            + " for stage: " + stage
-            + " based on job level configuration. Value: "
-            + conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting " + MRJobConfig.MAP_OUTPUT_KEY_CLASS
+              + " for stage: " + stage
+              + " based on job level configuration. Value: "
+              + conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS));
+        }
       }
     }
-    
+
     if (conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS) == null
-        && conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS) == null) {
+        && conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS)
+        == null) {
       if (conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS) == null) {
         conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, jobConf
             .getMapOutputValueClass().getName());
-        LOG.info("XXX: Setting " + MRJobConfig.MAP_OUTPUT_VALUE_CLASS
-            + " for stage: " + stage
-            + " based on job level configuration. Value: "
-            + conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting " + MRJobConfig.MAP_OUTPUT_VALUE_CLASS
+              + " for stage: " + stage
+              + " based on job level configuration. Value: "
+              + conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS));
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 276294c..5683fa1 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -76,16 +76,16 @@ import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 
-public abstract class MRTask 
+public abstract class MRTask
 extends RunningTaskContext {
 
   static final Log LOG = LogFactory.getLog(MRTask.class);
-  
+
   protected JobConf jobConf;
   protected JobContext jobContext;
   protected TaskAttemptContext taskAttemptContext;
   protected OutputCommitter committer;
-  
+
   // Current counters
   transient TezCounters counters = new TezCounters();
   protected GcTimeUpdater gcUpdater;
@@ -93,10 +93,10 @@ extends RunningTaskContext {
   private long initCpuCumulativeTime = 0;
   protected TezEngineTaskContext tezEngineTaskContext;
   protected TezTaskAttemptID taskAttemptId;
-  
+
   /* flag to track whether task is done */
   AtomicBoolean taskDone = new AtomicBoolean(false);
-  
+
   /** Construct output file names so that, when an output directory listing is
    * sorted lexicographically, positions correspond to output partitions.*/
   private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
@@ -104,16 +104,16 @@ extends RunningTaskContext {
     NUMBER_FORMAT.setMinimumIntegerDigits(5);
     NUMBER_FORMAT.setGroupingUsed(false);
   }
-                          
+
   private final static int MAX_RETRIES = 10;
 
   /** The number of milliseconds between progress reports. */
   public static final int PROGRESS_INTERVAL = 3000;
 
   private MRTaskReporter mrReporter;
-  
+
   protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
-  
+
   /**
    * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
    */
@@ -129,7 +129,7 @@ extends RunningTaskContext {
         new MRTaskStatus(
             taskAttemptId,
             counters,
-            (taskAttemptId.getTaskID().getVertexID().getId() == 0 ? 
+            (taskAttemptId.getTaskID().getVertexID().getId() == 0 ?
                 Phase.MAP : Phase.SHUFFLE)
         );
     gcUpdater = new GcTimeUpdater(counters);
@@ -143,15 +143,15 @@ extends RunningTaskContext {
     } else {
       this.jobConf = new JobConf(conf);
     }
-    reporter = 
+    reporter =
         new TezTaskReporterImpl(this, (TezTaskUmbilicalProtocol)master);
     ((TezTaskReporterImpl)reporter).startCommunicationThread();
-    
-    jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID, 
+
+    jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID,
         taskAttemptId.toString());
 
     initResourceCalculatorPlugin();
-    
+
     LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());
   }
 
@@ -171,7 +171,7 @@ extends RunningTaskContext {
   public TezTaskUmbilicalProtocol getUmbilical() {
     return ((TezTaskReporterImpl)reporter).getUmbilical();
   }
-  
+
   public void initTask(JobConf job, TezDAGID dagId,
       MRTaskReporter mrReporter,
       boolean useNewApi) throws IOException,
@@ -203,18 +203,18 @@ extends RunningTaskContext {
     } else {
       setCommitter(job.getOutputCommitter());
     }
-    
+
     Path outputPath = FileOutputFormat.getOutputPath(job);
     if (outputPath != null) {
       if ((getCommitter() instanceof FileOutputCommitter)) {
-        FileOutputFormat.setWorkOutputPath(job, 
+        FileOutputFormat.setWorkOutputPath(job,
             ((FileOutputCommitter)getCommitter()).getTaskAttemptPath(taskAttemptContext));
       } else {
         FileOutputFormat.setWorkOutputPath(job, outputPath);
       }
     }
     getCommitter().setupTask(taskAttemptContext);
-    
+
     partitioner = new MRPartitioner(this);
     ((MRPartitioner)partitioner).initialize(job, getTaskReporter());
 
@@ -244,7 +244,7 @@ extends RunningTaskContext {
 
   public void setState(State state) {
     // TODO Auto-generated method stub
-    
+
   }
 
   public State getState() {
@@ -261,22 +261,22 @@ extends RunningTaskContext {
   }
 
   public TezCounters getCounters() { return counters; }
-  
+
   /**
-   * Return current phase of the task. 
+   * Return current phase of the task.
    * needs to be synchronized as communication thread sends the phase every second
    * @return the curent phase of the task
    */
   public synchronized TezTaskStatus.Phase getPhase(){
-    return status.getPhase(); 
+    return status.getPhase();
   }
-  
+
   /**
-   * Set current phase of the task. 
-   * @param phase task phase 
+   * Set current phase of the task.
+   * @param phase task phase
    */
   protected synchronized void setPhase(TezTaskStatus.Phase phase){
-    status.setPhase(phase); 
+    status.setPhase(phase);
   }
 
   public void setConf(JobConf jobConf) {
@@ -290,9 +290,9 @@ extends RunningTaskContext {
   /**
    * Gets a handle to the Statistics instance based on the scheme associated
    * with path.
-   * 
+   *
    * @param path the path.
-   * @param conf the configuration to extract the scheme from if not part of 
+   * @param conf the configuration to extract the scheme from if not part of
    *   the path.
    * @return a Statistics instance, or null if none is found for the scheme.
    */
@@ -313,7 +313,7 @@ extends RunningTaskContext {
   public synchronized String getOutputName() {
     return "part-" + NUMBER_FORMAT.format(taskAttemptId.getTaskID().getId());
   }
- 
+
   public void waitBeforeCompletion(MRTaskReporter reporter) throws IOException,
       InterruptedException {
     TezTaskUmbilicalProtocol umbilical = getUmbilical();
@@ -400,7 +400,7 @@ extends RunningTaskContext {
         } catch (InterruptedException ie) {
           // ignore
         } catch (IOException ie) {
-          LOG.warn("Failure sending commit pending: " + 
+          LOG.warn("Failure sending commit pending: " +
               StringUtils.stringifyException(ie));
           if (--retries == 0) {
             System.exit(67);
@@ -421,7 +421,7 @@ extends RunningTaskContext {
     sendDone(umbilical);
   }
 
-  
+
   private boolean isCommitRequired() throws IOException {
     return committer.needsTaskCommit(taskAttemptContext);
   }
@@ -444,7 +444,7 @@ extends RunningTaskContext {
       } catch (InterruptedException ie) {
         Thread.currentThread().interrupt(); // interrupt ourself
       } catch (IOException ie) {
-        LOG.warn("Failure sending status update: " + 
+        LOG.warn("Failure sending status update: " +
             StringUtils.stringifyException(ie));
         if (--retries == 0) {
           throw ie;
@@ -454,9 +454,9 @@ extends RunningTaskContext {
   }
 
   /**
-   * Sends last status update before sending umbilical.done(); 
+   * Sends last status update before sending umbilical.done();
    */
-  private void sendLastUpdate() 
+  private void sendLastUpdate()
       throws IOException, InterruptedException {
     status.setOutputSize(-1l);
     // send a final status report
@@ -485,7 +485,7 @@ extends RunningTaskContext {
         }
         break;
       } catch (IOException ie) {
-        LOG.warn("Failure asking whether task can commit: " + 
+        LOG.warn("Failure asking whether task can commit: " +
             StringUtils.stringifyException(ie));
         if (--retries == 0) {
           //if it couldn't query successfully then delete the output
@@ -495,13 +495,13 @@ extends RunningTaskContext {
       }
     }
 
-    // task can Commit now  
+    // task can Commit now
     try {
       LOG.info("Task " + taskAttemptId + " is allowed to commit now");
       committer.commitTask(taskAttemptContext);
       return;
     } catch (IOException iee) {
-      LOG.warn("Failure committing: " + 
+      LOG.warn("Failure committing: " +
           StringUtils.stringifyException(iee));
       //if it couldn't commit a successfully then delete the output
       discardOutput(taskAttemptContext);
@@ -509,12 +509,12 @@ extends RunningTaskContext {
     }
   }
 
-  private 
+  private
   void discardOutput(TaskAttemptContext taskContext) {
     try {
       committer.abortTask(taskContext);
     } catch (IOException ioe)  {
-      LOG.warn("Failure cleaning up: " + 
+      LOG.warn("Failure cleaning up: " +
                StringUtils.stringifyException(ioe));
     }
   }
@@ -528,7 +528,7 @@ extends RunningTaskContext {
         LOG.info("Task '" + taskAttemptId + "' done.");
         return;
       } catch (IOException ie) {
-        LOG.warn("Failure signalling completion: " + 
+        LOG.warn("Failure signalling completion: " +
                  StringUtils.stringifyException(ie));
         if (--retries == 0) {
           throw ie;
@@ -540,7 +540,7 @@ extends RunningTaskContext {
   public void updateCounters() {
     // TODO Auto-generated method stub
     // TODO TEZAM Implement.
-    Map<String, List<FileSystem.Statistics>> map = new 
+    Map<String, List<FileSystem.Statistics>> map = new
         HashMap<String, List<FileSystem.Statistics>>();
     for(Statistics stat: FileSystem.getAllStatistics()) {
       String uriScheme = stat.getScheme();
@@ -563,7 +563,7 @@ extends RunningTaskContext {
       }
       updater.updateCounters();
     }
-    
+
     gcUpdater.incrementGcCounter();
     updateResourceCounters();
   }
@@ -599,7 +599,7 @@ extends RunningTaskContext {
     counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
     counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
   }
-  
+
 
   public static String normalizeStatus(String status, Configuration conf) {
     // Check to see if the status string is too long
@@ -614,75 +614,78 @@ extends RunningTaskContext {
     }
     return status;
   }
-  
-  protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> 
+
+  protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>
   org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
   createReduceContext(org.apache.hadoop.mapreduce.Reducer
                         <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
                       Configuration job,
-                      TezTaskAttemptID taskId, 
+                      TezTaskAttemptID taskId,
                       final TezRawKeyValueIterator rIter,
                       org.apache.hadoop.mapreduce.Counter inputKeyCounter,
                       org.apache.hadoop.mapreduce.Counter inputValueCounter,
-                      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, 
+                      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
                       org.apache.hadoop.mapreduce.OutputCommitter committer,
                       org.apache.hadoop.mapreduce.StatusReporter reporter,
                       RawComparator<INKEY> comparator,
                       Class<INKEY> keyClass, Class<INVALUE> valueClass
   ) throws IOException, InterruptedException {
-    RawKeyValueIterator r = 
+    RawKeyValueIterator r =
         new RawKeyValueIterator() {
-          
+
           @Override
           public boolean next() throws IOException {
             return rIter.next();
           }
-          
+
           @Override
           public DataInputBuffer getValue() throws IOException {
             return rIter.getValue();
           }
-          
+
           @Override
           public Progress getProgress() {
             return rIter.getProgress();
           }
-          
+
           @Override
           public DataInputBuffer getKey() throws IOException {
             return rIter.getKey();
           }
-          
+
           @Override
           public void close() throws IOException {
             rIter.close();
           }
         };
-    org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
-    reduceContext = 
+    org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
+    reduceContext =
       new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
-          job, 
-          IDConverter.toMRTaskAttemptId(taskId), 
-          r, 
-          inputKeyCounter, 
-          inputValueCounter, 
-          output, 
-          committer, 
-          reporter, 
-          comparator, 
-          keyClass, 
+          job,
+          IDConverter.toMRTaskAttemptId(taskId),
+          r,
+          inputKeyCounter,
+          inputValueCounter,
+          output,
+          committer,
+          reporter,
+          comparator,
+          keyClass,
           valueClass);
-    LOG.info("DEBUG: Using key class: " + keyClass + ", valueClass: " + valueClass);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using key class: " + keyClass
+          + ", valueClass: " + valueClass);
+    }
 
-    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
-        reducerContext = 
+    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+        reducerContext =
           new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
               reduceContext);
 
     return reducerContext;
   }
 
-  public void taskCleanup(TezTaskUmbilicalProtocol umbilical) 
+  public void taskCleanup(TezTaskUmbilicalProtocol umbilical)
       throws IOException, InterruptedException {
     // set phase for this task
     setPhase(TezTaskStatus.Phase.CLEANUP);
@@ -693,13 +696,13 @@ extends RunningTaskContext {
     committer.abortTask(taskAttemptContext);
   }
 
-  public void localizeConfiguration(JobConf jobConf) 
+  public void localizeConfiguration(JobConf jobConf)
       throws IOException, InterruptedException {
     jobConf.set(JobContext.TASK_ID, IDConverter
         .toMRTaskAttemptId(taskAttemptId).toString());
     jobConf.set(JobContext.TASK_ATTEMPT_ID,
         IDConverter.toMRTaskAttemptId(taskAttemptId).toString());
-    jobConf.setInt(JobContext.TASK_PARTITION, 
+    jobConf.setInt(JobContext.TASK_PARTITION,
         taskAttemptId.getTaskID().getId());
     jobConf.set(JobContext.ID, taskAttemptId.getTaskID().getVertexID().getDAGId().toString());
   }
@@ -727,11 +730,11 @@ extends RunningTaskContext {
   public JobContext getJobContext() {
     return jobContext;
   }
-  
+
   public TezTaskAttemptID getTaskAttemptId() {
     return taskAttemptId;
   }
-  
+
   public TezEngineTaskContext getTezEngineTaskContext() {
     return tezEngineTaskContext;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index e2ef5b6..05b5411 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -322,12 +322,10 @@ public class YARNRunner implements ClientProtocol {
   private TaskLocationHint[] getMapLocationHintsFromInputSplits(JobID jobId,
       FileSystem fs, Configuration conf,
       String jobSubmitDir) throws IOException {
-    LOG.info("XXXX Reading splits information");
     TaskSplitMetaInfo[] splitsInfo =
         SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf,
             new Path(jobSubmitDir));
     int splitsCount = splitsInfo.length;
-    LOG.info("XXXX Found splits information, splitCount=" + splitsCount);
     TaskLocationHint[] locationHints =
         new TaskLocationHint[splitsCount];
     for (int i = 0; i < splitsCount; ++i) {
@@ -462,11 +460,13 @@ public class YARNRunner implements ClientProtocol {
     Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
         getInitialClasspath(conf));
 
-    LOG.info("XXXX Dumping out env for child, isMap=" + isMap);
-    for (Map.Entry<String, String> entry : environment.entrySet()) {
-      LOG.info("XXXX env entry: "
-          + entry.getKey()
-          + "=" + entry.getValue());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Dumping out env for child, isMap=" + isMap);
+      for (Map.Entry<String, String> entry : environment.entrySet()) {
+        LOG.debug("Child env entry: "
+            + entry.getKey()
+            + "=" + entry.getValue());
+      }
     }
   }
 
@@ -518,11 +518,13 @@ public class YARNRunner implements ClientProtocol {
           jobLocalResources, i);
       dag.addVertex(vertices[i]);
 
-      LOG.info("XXXX Adding intermediate vertex to DAG"
-          + ", vertexName=" + vertices[i].getVertexName()
-          + ", processor=" + vertices[i].getProcessorName()
-          + ", parrellism=" + vertices[i].getParallelism()
-          + ", javaOpts=" + vertices[i].getJavaOpts());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding intermediate vertex to DAG"
+            + ", vertexName=" + vertices[i].getVertexName()
+            + ", processor=" + vertices[i].getProcessorName()
+            + ", parrellism=" + vertices[i].getParallelism()
+            + ", javaOpts=" + vertices[i].getJavaOpts());
+      }
     }
     return vertices;
   }
@@ -540,10 +542,12 @@ public class YARNRunner implements ClientProtocol {
 
     boolean isMRR = (intermediateReduces > 0);
 
-    LOG.info("XXXX Parsing job config"
-        + ", numMaps=" + numMaps
-        + ", numReduces=" + numReduces
-        + ", intermediateReduceStages=" + intermediateReduces);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Parsing job config"
+          + ", numMaps=" + numMaps
+          + ", numReduces=" + numReduces
+          + ", intermediateReduceStages=" + intermediateReduces);
+    }
 
     // configure map vertex
     String mapProcessor = "org.apache.tez.mapreduce.processor.map.MapProcessor";
@@ -575,11 +579,13 @@ public class YARNRunner implements ClientProtocol {
 
     mapVertex.setJavaOpts(getMapJavaOpts(jobConf));
 
-    LOG.info("XXXX Adding map vertex to DAG"
-        + ", vertexName=" + mapVertex.getVertexName()
-        + ", processor=" + mapVertex.getProcessorName()
-        + ", parrellism=" + mapVertex.getParallelism()
-        + ", javaOpts=" + mapVertex.getJavaOpts());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding map vertex to DAG"
+          + ", vertexName=" + mapVertex.getVertexName()
+          + ", processor=" + mapVertex.getProcessorName()
+          + ", parrellism=" + mapVertex.getParallelism()
+          + ", javaOpts=" + mapVertex.getJavaOpts());
+    }
     dag.addVertex(mapVertex);
 
     Vertex[] intermediateVertices = null;
@@ -618,11 +624,13 @@ public class YARNRunner implements ClientProtocol {
 
       reduceVertex.setJavaOpts(getReduceJavaOpts(jobConf));
 
-      LOG.info("XXXX Adding reduce vertex to DAG"
-          + ", vertexName=" + reduceVertex.getVertexName()
-          + ", processor=" + reduceVertex.getProcessorName()
-          + ", parrellism=" + reduceVertex.getParallelism()
-          + ", javaOpts=" + reduceVertex.getJavaOpts());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding reduce vertex to DAG"
+            + ", vertexName=" + reduceVertex.getVertexName()
+            + ", processor=" + reduceVertex.getProcessorName()
+            + ", parrellism=" + reduceVertex.getParallelism()
+            + ", javaOpts=" + reduceVertex.getJavaOpts());
+      }
       dag.addVertex(reduceVertex);
 
       EdgeProperty edgeProperty =
@@ -672,7 +680,9 @@ public class YARNRunner implements ClientProtocol {
     Map<String, String> mrParamToDAGParamMap = DeprecatedKeys.getMRToDAGParamMap();
     for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
       if (mrConf.get(entry.getKey()) != null) {
-        LOG.info("DEBUG: MR->DAG Setting new key: " + entry.getValue());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("MR->DAG Setting new key: " + entry.getValue());
+        }
         dag.addConfiguration(entry.getValue(), mrConf.get(entry.getKey()));
       }
     }


Mime
View raw message