tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2716. DefaultSorter.isRleNeeded not thread safe (rbalamohan)
Date Tue, 22 Sep 2015 22:40:40 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 9d5e12728 -> 2a8368c66


TEZ-2716. DefaultSorter.isRleNeeded not thread safe (rbalamohan)

(cherry picked from commit 592c74204c5428a8d4d45612d58363feb5cdf3b1)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2a8368c6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2a8368c6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2a8368c6

Branch: refs/heads/branch-0.7
Commit: 2a8368c66912fe38da4590b53962565a308f277f
Parents: 9d5e127
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Wed Sep 23 04:14:01 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Wed Sep 23 04:15:13 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 tez-runtime-library/findbugs-exclude.xml        |  6 ----
 .../common/sort/impl/dflt/DefaultSorter.java    | 33 ++++++++++++++------
 3 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2a8368c6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2138d7c..ab1c29c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES
+  TEZ-2716. DefaultSorter.isRleNeeded not thread safe
   TEZ-2847. Tez UI: Task details doesn't gets updated on manual refresh after job complete
   TEZ-2844. Backport TEZ-2775 to branch-0.7. Improve and consolidate logging in Runtime components.
   TEZ-2843. Tez UI: Show error if in progress fails due to AM not reachable

http://git-wip-us.apache.org/repos/asf/tez/blob/2a8368c6/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index aa1c7a2..45c194c 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -81,12 +81,6 @@
   </Match>
 
   <Match>
-    <Class name="org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter"/>
-    <Field name="totalKeys"/>
-    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
-  </Match>
-
-  <Match>
     <Class name="~org\.apache\.tez\.runtime\.library\.shuffle\.impl\.ShuffleUserPayloads\$.*Proto"/>
     <Field name="PARSER"/>
     <Bug pattern="MS_SHOULD_BE_FINAL"/>

http://git-wip-us.apache.org/repos/asf/tez/blob/2a8368c6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 2a10c35..ac90112 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -59,7 +59,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
 import com.google.common.base.Preconditions;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
-public class DefaultSorter extends ExternalSorter implements IndexedSortable {
+public final class DefaultSorter extends ExternalSorter implements IndexedSortable {
   
   private static final Logger LOG = LoggerFactory.getLogger(DefaultSorter.class);
 
@@ -663,7 +663,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
                   + ", length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) +
"/" +
                   maxRec);
         }
-        sortAndSpill();
+        long sameKeyCount = 0;
+        long totalKeysCount = 0;
+        synchronized (this) {
+          sameKeyCount = sameKey;
+          totalKeysCount = totalKeys;
+        }
+        sortAndSpill(sameKeyCount, totalKeysCount);
       }
     } catch (InterruptedException e) {
       throw new IOException("Interrupted while waiting for the writer", e);
@@ -698,6 +704,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
 
   protected class SpillThread extends Thread {
 
+    volatile long totalKeysCount;
+    volatile long sameKeyCount;
+
     @Override
     public void run() {
       spillLock.lock();
@@ -710,7 +719,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
           }
           try {
             spillLock.unlock();
-            sortAndSpill();
+            sortAndSpill(sameKeyCount, totalKeysCount);
           } catch (Throwable t) {
             LOG.warn(outputContext.getDestinationVertexName() + ": " + "Got an exception
in sortAndSpill", t);
             sortSpillException = t;
@@ -731,6 +740,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
         spillThreadRunning = false;
       }
     }
+
+    public void setTotalKeysProcessed(long sameKeyCount, long totalKeysCount) {
+      this.sameKeyCount = sameKeyCount;
+      this.totalKeysCount = totalKeysCount;
+    }
   }
 
   private void checkSpillException() throws IOException {
@@ -757,6 +771,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
           +", kvend = " + kvend + "(" + (kvend * 4) + ")"
           + ", length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" + maxRec);
     }
+    spillThread.setTotalKeysProcessed(sameKey, totalKeys);
     spillReady.signal();
   }
 
@@ -771,16 +786,16 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
         : kvmeta.capacity() + kvstart) / NMETA;
   }
 
-  private boolean isRLENeeded() {
-    return (sameKey > (0.1 * totalKeys)) || (sameKey < 0);
+  private boolean isRLENeeded(long sameKeyCount, long totalKeysCount) {
+    return (sameKeyCount > (0.1 * totalKeysCount)) || (sameKeyCount < 0);
   }
 
-  protected void sortAndSpill()
+  protected void sortAndSpill(long sameKeyCount, long totalKeysCount)
       throws IOException, InterruptedException {
     final int mstart = getMetaStart();
     final int mend = getMetaEnd();
     sorter.sort(this, mstart, mend, nullProgressable);
-    spill(mstart, mend);
+    spill(mstart, mend, sameKeyCount, totalKeysCount);
   }
 
   private void adjustSpillCounters(long rawLen, long compLength) {
@@ -799,7 +814,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
     }
   }
 
-  protected void spill(int mstart, int mend)
+  protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCount)
       throws IOException, InterruptedException {
 
     //approximate the length of the output file to be the length of the
@@ -819,7 +834,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
 
       int spindex = mstart;
       final InMemValBytes value = createInMemValBytes();
-      boolean rle = isRLENeeded();
+      boolean rle = isRLENeeded(sameKeyCount, totalKeysCount);
       for (int i = 0; i < partitions; ++i) {
         IFile.Writer writer = null;
         try {


Mime
View raw message