tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2716. DefaultSorter.isRleNeeded not thread safe (rbalamohan)
Date Wed, 30 Sep 2015 21:50:43 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 7514b157e -> 3875a0409


TEZ-2716. DefaultSorter.isRleNeeded not thread safe (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3875a040
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3875a040
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3875a040

Branch: refs/heads/branch-0.6
Commit: 3875a04093999463262ed003716c9e5c826bd8df
Parents: 7514b15
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Thu Oct 1 03:24:29 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Thu Oct 1 03:24:29 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../common/sort/impl/dflt/DefaultSorter.java    | 33 ++++++++++++++------
 2 files changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3875a040/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1209e22..768db48 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2716. DefaultSorter.isRleNeeded not thread safe.
   TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
   TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
   TEZ-2398. Flaky test: TestFaultTolerance

http://git-wip-us.apache.org/repos/asf/tez/blob/3875a040/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 69a303c..6ebc442 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -56,7 +56,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
 import com.google.common.base.Preconditions;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
-public class DefaultSorter extends ExternalSorter implements IndexedSortable {
+public final class DefaultSorter extends ExternalSorter implements IndexedSortable {
   
   private static final Log LOG = LogFactory.getLog(DefaultSorter.class);
 
@@ -646,7 +646,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
                    "); length = " + (distanceTo(kvend, kvstart,
                          kvmeta.capacity()) + 1) + "/" + maxRec);
         }
-        sortAndSpill();
+        long sameKeyCount = 0;
+        long totalKeysCount = 0;
+        synchronized (this) {
+          sameKeyCount = sameKey;
+          totalKeysCount = totalKeys;
+        }
+        sortAndSpill(sameKeyCount, totalKeysCount);
       }
     } catch (InterruptedException e) {
       throw new IOException("Interrupted while waiting for the writer", e);
@@ -679,6 +685,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
 
   protected class SpillThread extends Thread {
 
+    volatile long totalKeysCount;
+    volatile long sameKeyCount;
+
     @Override
     public void run() {
       spillLock.lock();
@@ -691,7 +700,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
           }
           try {
             spillLock.unlock();
-            sortAndSpill();
+            sortAndSpill(sameKeyCount, totalKeysCount);
           } catch (Throwable t) {
             LOG.warn("Got an exception in sortAndSpill", t);
             sortSpillException = t;
@@ -712,6 +721,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
         spillThreadRunning = false;
       }
     }
+
+    public void setTotalKeysProcessed(long sameKeyCount, long totalKeysCount) {
+      this.sameKeyCount = sameKeyCount;
+      this.totalKeysCount = totalKeysCount;
+    }
   }
 
   private void checkSpillException() throws IOException {
@@ -740,6 +754,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
                "); length = " + (distanceTo(kvend, kvstart,
                      kvmeta.capacity()) + 1) + "/" + maxRec);
     }
+    spillThread.setTotalKeysProcessed(sameKey, totalKeys);
     spillReady.signal();
   }
 
@@ -754,19 +769,19 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
         : kvmeta.capacity() + kvstart) / NMETA;
   }
 
-  private boolean isRLENeeded() {
-    return (sameKey > (0.1 * totalKeys)) || (sameKey < 0);
+  private boolean isRLENeeded(long sameKeyCount, long totalKeysCount) {
+    return (sameKeyCount > (0.1 * totalKeysCount)) || (sameKeyCount < 0);
   }
 
-  protected void sortAndSpill()
+  protected void sortAndSpill(long sameKeyCount, long totalKeysCount)
       throws IOException, InterruptedException {
     final int mstart = getMetaStart();
     final int mend = getMetaEnd();
     sorter.sort(this, mstart, mend, nullProgressable);
-    spill(mstart, mend);
+    spill(mstart, mend, sameKeyCount, totalKeysCount);
   }
 
-  protected void spill(int mstart, int mend)
+  protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCount)
       throws IOException, InterruptedException {
 
     //approximate the length of the output file to be the length of the
@@ -785,7 +800,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
 
       int spindex = mstart;
       final InMemValBytes value = createInMemValBytes();
-      boolean rle = isRLENeeded();
+      boolean rle = isRLENeeded(sameKeyCount, totalKeysCount);
       for (int i = 0; i < partitions; ++i) {
         IFile.Writer writer = null;
         try {


Mime
View raw message