tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject git commit: TEZ-1083. Enable IFile RLE for DefaultSorter (Rajesh Balamohan) (Cherry picked from commit c04c7b5e5a08c90e5c128885cbd685d0d14e6ae8)
Date Wed, 15 Oct 2014 05:10:49 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 e710338c4 -> ae8084633


TEZ-1083.  Enable IFile RLE for DefaultSorter  (Rajesh Balamohan)
(Cherry picked from commit c04c7b5e5a08c90e5c128885cbd685d0d14e6ae8)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ae808463
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ae808463
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ae808463

Branch: refs/heads/branch-0.5
Commit: ae8084633a38f7262ddf0fec4aff80236efa792b
Parents: e710338
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Wed Oct 15 10:35:12 2014 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Wed Oct 15 10:40:15 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                        |  1 +
 .../common/sort/impl/dflt/DefaultSorter.java       | 17 +++++++++++++++--
 2 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ae808463/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1912946..e1f8c58 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,7 @@ ALL CHANGES:
   TEZ-1470. Recovery fails due to TaskAttemptFinishedEvent being recorded multiple times
for the same task.
   TEZ-1649. ShuffleVertexManager auto reduce parallelism can cause jobs to hang indefinitely.
   TEZ-1566. Reduce log verbosity.
+  TEZ-1083. Enable IFile RLE for DefaultSorter.
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/ae808463/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 9e74515..82ae225 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -111,6 +111,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
   private final int indexCacheMemoryLimit;
   private int totalIndexCacheMemory;
 
+  private long totalKeys = 0;
+  private long sameKey = 0;
+
   public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs,
       long initialMemoryAvailable) throws IOException {
     super(outputContext, conf, numOutputs, initialMemoryAvailable);
@@ -304,6 +307,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
       kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
       // advance kvindex
       kvindex = (int)(((long)kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity());
+      totalKeys++;
     } catch (MapBufferTooSmallException e) {
       LOG.info("Record too large for in-memory buffer: " + e.getMessage());
       spillSingleRecord(key, value, partition);
@@ -390,12 +394,16 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
       return kvip - kvjp;
     }
     // sort by key
-    return comparator.compare(kvbuffer,
+    int result = comparator.compare(kvbuffer,
         kvmeta.get(kvi + KEYSTART),
         kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
         kvbuffer,
         kvmeta.get(kvj + KEYSTART),
         kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
+    if (result == 0) {
+      sameKey++;
+    }
+    return result;
   }
 
   final byte META_BUFFER_TMP[] = new byte[METASIZE];
@@ -715,6 +723,10 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
         : kvmeta.capacity() + kvstart) / NMETA;
   }
 
+  private boolean isRLENeeded() {
+    return (sameKey > (0.1 * totalKeys)) || (sameKey < 0);
+  }
+
   protected void sortAndSpill()
       throws IOException, InterruptedException {
     final int mstart = getMetaStart();
@@ -742,12 +754,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
 
       int spindex = mstart;
       final InMemValBytes value = createInMemValBytes();
+      boolean rle = isRLENeeded();
       for (int i = 0; i < partitions; ++i) {
         IFile.Writer writer = null;
         try {
           long segmentStart = out.getPos();
           writer = new Writer(conf, out, keyClass, valClass, codec,
-                                    spilledRecordsCounter, null);
+                                    spilledRecordsCounter, null, rle);
           if (combiner == null) {
             // spill directly
             DataInputBuffer key = new DataInputBuffer();


Mime
View raw message