tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3582. Exception swallowed in PipelinedSorter causing incorrect results (rbalamohan)
Date Thu, 26 Jan 2017 00:29:25 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 4a2316ba8 -> 1a5595032


TEZ-3582. Exception swallowed in PipelinedSorter causing incorrect results (rbalamohan)

(cherry picked from commit abab526940f6353866d866b93d6da685edfa6014)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1a559503
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1a559503
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1a559503

Branch: refs/heads/branch-0.7
Commit: 1a5595032dbd6ddd0396dc3b0b3e251e603b06f4
Parents: 4a2316b
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Wed Jan 25 13:02:53 2017 +0530
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jan 25 16:29:18 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../common/sort/impl/PipelinedSorter.java       | 20 ++++++++++++++++----
 2 files changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1a559503/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7204600..1a68c05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3582. Exception swallowed in PipelinedSorter causing incorrect results
   TEZ-3559. TEZ_LIB_URIS doesn't work with schemes different than the defaultFS
   TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly
   TEZ-3537. ArrayIndexOutOfBoundsException with empty environment variables/Port YARN-3768
to Tez

http://git-wip-us.apache.org/repos/asf/tez/blob/1a559503/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index be9b10c..f4daf1c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -258,6 +258,7 @@ public class PipelinedSorter extends ExternalSorter {
     } else {
       // queue up the sort
       SortTask task = new SortTask(span, sorter);
+      LOG.debug("Submitting span={} for sort", span.toString());
       Future<SpanIterator> future = sortmaster.submit(task);
       merger.add(future);
       span = newSpan;
@@ -834,8 +835,15 @@ public class PipelinedSorter extends ExternalSorter {
           items = 1024*1024;
           perItem = 16;
         }
-        newSpan = new SortSpan(remaining, items, perItem,
-            ConfigUtils.getIntermediateOutputKeyComparator(conf));
+        final RawComparator newComparator = ConfigUtils.getIntermediateOutputKeyComparator(conf);
+        if (this.comparator == newComparator) {
+          LOG.warn("Same comparator used. comparator={}, newComparator={},"
+                  + " hashCode: comparator={}, newComparator={}",
+              this.comparator, newComparator,
+              System.identityHashCode(this.comparator),
+              System.identityHashCode(newComparator));
+        }
+        newSpan = new SortSpan(remaining, items, perItem, newComparator);
         newSpan.index = index+1;
         LOG.info(String.format(outputContext.getDestinationVertexName() + ": " + "New Span%d.length
= %d, perItem = %d", newSpan.index, newSpan
             .length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue());
@@ -1142,6 +1150,7 @@ public class PipelinedSorter extends ExternalSorter {
     }
 
     public final boolean ready() throws IOException, InterruptedException {
+      int numSpanItr = futures.size();
       try {
         SpanIterator iter = null;
         while(this.futures.size() > 0) {
@@ -1160,8 +1169,11 @@ public class PipelinedSorter extends ExternalSorter {
         LOG.info(outputContext.getDestinationVertexName() + ": " + "Heap = " + sb.toString());
         return true;
       } catch(Exception e) {
-        LOG.info(outputContext.getDestinationVertexName() + ": " + e.toString());
-        return false;
+        LOG.error("Heap size={}, total={}, eq={}, partition={}, gallop={}, totalItr={},"
+                + " futures.size={}, destVertexName={}",
+            heap.size(), total, eq, partition, gallop, numSpanItr, futures.size(),
+            outputContext.getDestinationVertexName(), e);
+        throw new IOException(e);
       }
     }
 


Mime
View raw message