tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-1803. Support > 2gb sort buffer in pipelinedsorter (rbalamohan)
Date Thu, 22 Jan 2015 07:53:38 GMT
Repository: tez
Updated Branches:
  refs/heads/master 7311d7d40 -> 3f4e8a7bf


TEZ-1803. Support > 2gb sort buffer in pipelinedsorter (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3f4e8a7b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3f4e8a7b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3f4e8a7b

Branch: refs/heads/master
Commit: 3f4e8a7bf2e8615d38770e6b0a2c648a8d078634
Parents: 7311d7d
Author: Rajesh Balamohan <rbalamohan@hortonworks.com>
Authored: Thu Jan 22 13:23:08 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@hortonworks.com>
Committed: Thu Jan 22 13:23:08 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../common/sort/impl/ExternalSorter.java        |  22 +-
 .../common/sort/impl/PipelinedSorter.java       |  90 ++++++--
 .../common/sort/impl/dflt/DefaultSorter.java    |   8 +-
 .../common/sort/impl/TestPipelinedSorter.java   | 209 +++++++++++++++++++
 .../sort/impl/dflt/TestDefaultSorter.java       |   4 +-
 .../library/output/TestOnFileSortedOutput.java  |   3 +
 7 files changed, 304 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4e529c..cc86772 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1803. Support > 2gb sort buffer in pipelinedsorter.
   TEZ-1826. Add option to disable split grouping and local mode option for tez-examples.
   TEZ-1982. TezChild setupUgi should not be using environment.
   TEZ-1980. Suppress tez-dag findbugs warnings until addressed.

http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 19d60e4..a1da36a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -101,7 +101,7 @@ public abstract class ExternalSorter {
   protected final int ifileReadAheadLength;
   protected final int ifileBufferSize;
 
-  protected final int availableMemoryMb;
+  protected final long availableMemoryMb;
 
   protected final IndexedSorter sorter;
 
@@ -149,18 +149,10 @@ public abstract class ExternalSorter {
 
     rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
 
+    LOG.info("Initial Mem : " + initialMemoryAvailable + ", assignedMb=" + ((initialMemoryAvailable
>> 20)));
     int assignedMb = (int) (initialMemoryAvailable >> 20);
-    if (assignedMb <= 0) {
-      if (initialMemoryAvailable > 0) { // Rounded down to 0MB - may be > 0 &&
< 1MB
-        this.availableMemoryMb = 1;
-        LOG.warn("initialAvailableMemory: " + initialMemoryAvailable
-            + " is too low. Rounding to 1 MB");
-      } else {
-        throw new RuntimeException("InitialMemoryAssigned is <= 0: " + initialMemoryAvailable);
-      }
-    } else {
-      this.availableMemoryMb = assignedMb;
-    }
+    //Let the overflow checks happen in appropriate sorter impls
+    this.availableMemoryMb = assignedMb;
 
     // sorter
     sorter = ReflectionUtils.newInstance(this.conf.getClass(
@@ -302,9 +294,9 @@ public abstract class ExternalSorter {
         conf.getInt(
             TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 
             TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB_DEFAULT);
-    Preconditions.checkArgument(initialMemRequestMb > 0 && initialMemRequestMb
<= 2047,
-        TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB
-            + " should be larger than 0 and less than or equal to 2047");
+    //Higher bound checks are done in individual sorter implementations
+    Preconditions.checkArgument(initialMemRequestMb > 0,
+        TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " should be larger than 0");
     long reqBytes = ((long) initialMemRequestMb) << 20;
     LOG.info("Requested SortBufferSize ("
         + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "): "

http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 9b171ab..c1a6637 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -25,14 +25,19 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.IntBuffer;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -82,7 +87,17 @@ public class PipelinedSorter extends ExternalSorter {
   private final ProxyComparator hasher;
   // SortSpans  
   private SortSpan span;
-  private ByteBuffer largeBuffer;
+  //Maintain a bunch of ByteBuffers (each of them can hold approximately 2 GB data)
+  @VisibleForTesting
+  protected final LinkedList<ByteBuffer> bufferList = new LinkedList<ByteBuffer>();
+  private ListIterator<ByteBuffer> listIterator;
+
+  //total memory capacity allocated to sorter
+  private long capacity;
+
+  private static final int BLOCK_SIZE = 1536 << 20;
+
+
   // Merger
   private final SpanMerger merger; 
   private final ExecutorService sortmaster;
@@ -96,23 +111,42 @@ public class PipelinedSorter extends ExternalSorter {
 
   public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
       long initialMemoryAvailable) throws IOException {
+    this(outputContext,conf,numOutputs, initialMemoryAvailable, BLOCK_SIZE);
+  }
+
+  public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
+      long initialMemoryAvailable, int blockSize) throws IOException {
     super(outputContext, conf, numOutputs, initialMemoryAvailable);
     
     partitionBits = bitcount(partitions)+1;
    
     //sanity checks
-    final int sortmb = this.availableMemoryMb;
+    final long sortmb = this.availableMemoryMb;
     indexCacheMemoryLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
                                        TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT);
 
     // buffers and accounting
-    int maxMemUsage = sortmb << 20;
-    maxMemUsage -= maxMemUsage % METASIZE;
-    largeBuffer = ByteBuffer.allocate(maxMemUsage);
-    Preconditions.checkArgument(largeBuffer.hasArray(), "Expected array backed byte buffer");
+    long maxMemUsage = sortmb << 20;
+    Preconditions.checkArgument(blockSize > 0 && blockSize < Integer.MAX_VALUE,"Block
size should be" + " within 1 - Integer.MAX_VALUE" + blockSize);
+    long usage = sortmb << 20;
+    //Divide total memory into different blocks.
+    int numberOfBlocks = Math.max(1, (int) Math.ceil(1.0 * usage / blockSize));
+    LOG.info("Number of Blocks : " + numberOfBlocks
+        + ", maxMemUsage=" + maxMemUsage + ", BLOCK_SIZE=" + blockSize);
+    for (int i = 0; i < numberOfBlocks; i++) {
+      Preconditions.checkArgument(usage > 0, "usage can't be less than zero " + usage);
+      long size = Math.min(usage, blockSize);
+      int sizeWithoutMeta = (int) ((size) - (size % METASIZE));
+      bufferList.add(ByteBuffer.allocate(sizeWithoutMeta));
+      capacity += sizeWithoutMeta;
+      usage -= size;
+    }
+    listIterator = bufferList.listIterator();
+
+
     LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " = " + sortmb);
-    // TODO: configurable setting?
-    span = new SortSpan(largeBuffer, 1024*1024, 16, comparator);
+    Preconditions.checkArgument(listIterator.hasNext(), "Buffer list seems to be empty "
+ bufferList.size());
+    span = new SortSpan(listIterator.next(), 1024*1024, 16, this.comparator);
     merger = new SpanMerger(); // SpanIterators are comparable
     final int sortThreads = 
             this.conf.getInt(
@@ -149,21 +183,28 @@ public class PipelinedSorter extends ExternalSorter {
     SortSpan newSpan = span.next();
 
     if(newSpan == null) {
+      Stopwatch stopWatch = new Stopwatch();
+      stopWatch.start();
       // sort in the same thread, do not wait for the thread pool
       merger.add(span.sort(sorter));
       spill();
+      stopWatch.stop();
+      LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms");
+      //safe to reset the iterator
+      listIterator = bufferList.listIterator();
       int items = 1024*1024;
       int perItem = 16;
       if(span.length() != 0) {
         items = span.length();
         perItem = span.kvbuffer.limit()/items;
-        items = (largeBuffer.capacity())/(METASIZE+perItem);
+        items = (int) ((span.capacity)/(METASIZE+perItem));
         if(items > 1024*1024) {
             // our goal is to have 1M splits and sort early
             items = 1024*1024;
         }
       }
-      span = new SortSpan(largeBuffer, items, perItem, this.comparator);
+      Preconditions.checkArgument(listIterator.hasNext(), "block iterator should not be empty");
+      span = new SortSpan((ByteBuffer)listIterator.next().clear(), (1024*1024), perItem,
this.comparator);
     } else {
       // queue up the sort
       SortTask task = new SortTask(span, sorter);
@@ -176,7 +217,7 @@ public class PipelinedSorter extends ExternalSorter {
   }
 
   @Override
-  public void write(Object key, Object value) 
+  public void write(Object key, Object value)
       throws IOException {
     collect(
         key, value, partitioner.getPartition(key, value, partitions));
@@ -242,7 +283,7 @@ public class PipelinedSorter extends ExternalSorter {
 
   public void spill() throws IOException { 
     // create spill file
-    final long size = largeBuffer.capacity()
+    final long size = capacity +
         + (partitions * APPROX_HEADER_LENGTH);
     final TezSpillRecord spillRec = new TezSpillRecord(partitions);
     final Path filename =
@@ -307,7 +348,8 @@ public class PipelinedSorter extends ExternalSorter {
     spill();
     sortmaster.shutdown();
 
-    largeBuffer = null;
+    //safe to clean up
+    bufferList.clear();
 
     numAdditionalSpills.increment(numSpills - 1);
 
@@ -464,9 +506,12 @@ public class PipelinedSorter extends ExternalSorter {
 
     private int index = 0;
     private long eq = 0;
+    private boolean reinit = false;
+    private int capacity;
+
 
     public SortSpan(ByteBuffer source, int maxItems, int perItem, RawComparator comparator)
{
-      int capacity = source.remaining(); 
+      capacity = source.remaining();
       int metasize = METASIZE*maxItems;
       int dataSize = maxItems * perItem;
       if(capacity < (metasize+dataSize)) {
@@ -552,10 +597,18 @@ public class PipelinedSorter extends ExternalSorter {
     public SortSpan next() {
       ByteBuffer remaining = end();
       if(remaining != null) {
+        SortSpan newSpan = null;
         int items = length();
         int perItem = kvbuffer.position()/items;
-        SortSpan newSpan = new SortSpan(remaining, items, perItem, this.comparator);
+        if (reinit) { //next mem block
+          //quite possible that the previous span had a length of 1. It is better to reinit
here for new span.
+          items = 1024*1024;
+          perItem = 16;
+        }
+        newSpan = new SortSpan(remaining, items, perItem, this.comparator);
         newSpan.index = index+1;
+        LOG.info(String.format("New Span%d.length = %d, perItem = %d", newSpan.index, newSpan
+            .length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue());
         return newSpan;
       }
       return null;
@@ -578,6 +631,13 @@ public class PipelinedSorter extends ExternalSorter {
       int perItem = kvbuffer.position()/items;
       LOG.info(String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
       if(remaining.remaining() < METASIZE+perItem) {
+        //Check if we can get the next Buffer from the main buffer list
+        if (listIterator.hasNext()) {
+          LOG.info("Getting memory from next block in the list, recordsWritten=" +
+              mapOutputRecordCounter.getValue());
+          reinit = true;
+          return listIterator.next();
+        }
         return null;
       }
       return remaining;

http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index b99f319..56a3f27 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -123,7 +123,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
     final float spillper = this.conf.getFloat(
         TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT,
         TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT);
-    final int sortmb = this.availableMemoryMb;
+    final int sortmb = (int) availableMemoryMb;
+    if (sortmb <= 0) {
+      throw new RuntimeException("InitialMemoryAssigned is <= 0: " + initialMemoryAvailable);
+    }
+    Preconditions.checkArgument(sortmb > 0 && sortmb <= 2047,
+        TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB
+            + " for DefaultSorter should be larger than 0 and less than or equal to 2047");
     Preconditions.checkArgument(spillper <= (float) 1.0 && spillper > (float)
0.0,
         TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT
             + " should be greater than 0 and less than or equal to 1");

http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
new file mode 100644
index 0000000..7ba0bf4
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -0,0 +1,209 @@
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class TestPipelinedSorter {
+  private static final Configuration conf = new Configuration();
+  private static FileSystem localFs = null;
+  private static Path workDir = null;
+
+  private int numOutputs;
+  private long initialAvailableMem;
+  private OutputContext outputContext;
+
+  //TODO: Need to make it nested structure so that multiple partition cases can be validated
+  private static TreeMap<String, String> sortedDataMap = Maps.newTreeMap();
+
+  static {
+    conf.set("fs.defaultFS", "file:///");
+    try {
+      localFs = FileSystem.getLocal(conf);
+      workDir = new Path(
+          new Path(System.getProperty("test.build.data", "/tmp")),
+          TestPipelinedSorter.class.getName())
+          .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Before
+  public void setup() {
+    ApplicationId appId = ApplicationId.newInstance(10000, 1);
+    TezCounters counters = new TezCounters();
+    String uniqueId = UUID.randomUUID().toString();
+    this.outputContext = createMockOutputContext(counters, appId, uniqueId);
+
+    //To enable PipelinedSorter, set 2 threads
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2);
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+        HashPartitioner.class.getName());
+
+    //Setup localdirs
+    String localDirs = workDir.toString();
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    localFs.delete(workDir, true);
+    sortedDataMap.clear();
+  }
+
+  @Test
+  public void basicTest() throws IOException {
+    //TODO: need to support multiple partition testing later
+
+    //# partition, # of keys, size per key, InitialMem, blockSize
+    basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
+  }
+
+  public void basicTest(int partitions, int numKeys, int keySize,
+      long initialAvailableMem, int blockSize) throws IOException {
+    this.numOutputs = partitions; // single output
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+        initialAvailableMem, blockSize);
+
+    //Write 100 keys each of size 10
+    writeData(sorter, numKeys, keySize);
+
+    Path outputFile = sorter.finalOutputFile;
+    FileSystem fs = outputFile.getFileSystem(conf);
+
+    IFile.Reader reader = new IFile.Reader(fs, outputFile, null, null, null, false, -1, 4096);
+    //Verify dataset
+    verifyData(reader);
+    reader.close();
+  }
+
+  @Test
+  //Its not possible to allocate > 2 GB in test environment.  Carry out basic checks here.
+  public void memTest() throws IOException {
+    //Verify if > 2 GB can be set via config
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3076);
+    long size = ExternalSorter.getInitialMemoryRequirement(conf, 3076);
+    Assert.assertTrue(size == (3076l << 20));
+
+    //Verify BLOCK_SIZEs
+    this.initialAvailableMem = 10 * 1024 * 1024;
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+        initialAvailableMem, 1 << 20);
+    Assert.assertTrue(sorter.bufferList.size() == 10);
+
+    sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+        initialAvailableMem, 3 << 20);
+    Assert.assertTrue(sorter.bufferList.size() == 4);
+
+    sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+        initialAvailableMem, 10 << 20);
+    Assert.assertTrue(sorter.bufferList.size() == 1);
+  }
+
+  private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException
{
+    sortedDataMap.clear();
+    RandomDataGenerator generator = new RandomDataGenerator();
+    for (int i = 0; i < numKeys; i++) {
+      Text key = new Text(generator.nextHexString(keyLen));
+      Text value = new Text(generator.nextHexString(keyLen));
+      sorter.write(key, value);
+      sortedDataMap.put(key.toString(), value.toString()); //for verifying data later
+    }
+    sorter.flush();
+    sorter.close();
+  }
+
+  private void verifyData(IFile.Reader reader)
+      throws IOException {
+    Text readKey = new Text();
+    Text readValue = new Text();
+    DataInputBuffer keyIn = new DataInputBuffer();
+    DataInputBuffer valIn = new DataInputBuffer();
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    Deserializer<Text> keyDeserializer = serializationFactory.getDeserializer(Text.class);
+    Deserializer<Text> valDeserializer = serializationFactory.getDeserializer(Text.class);
+    keyDeserializer.open(keyIn);
+    valDeserializer.open(valIn);
+
+    int numRecordsRead = 0;
+
+    for (Map.Entry<String, String> entry : sortedDataMap.entrySet()) {
+      String key = entry.getKey();
+      String val = entry.getValue();
+      if (reader.nextRawKey(keyIn)) {
+        reader.nextRawValue(valIn);
+        readKey = keyDeserializer.deserialize(readKey);
+        readValue = valDeserializer.deserialize(readValue);
+        Assert.assertTrue(key.equalsIgnoreCase(readKey.toString()));
+        Assert.assertTrue(val.equalsIgnoreCase(readValue.toString()));
+        numRecordsRead++;
+      }
+    }
+    Assert.assertTrue(numRecordsRead == sortedDataMap.size());
+  }
+
+  private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
+      String uniqueId) {
+    OutputContext outputContext = mock(OutputContext.class);
+    doReturn(counters).when(outputContext).getCounters();
+    doReturn(appId).when(outputContext).getApplicationId();
+    doReturn(1).when(outputContext).getDAGAttemptNumber();
+    doReturn("dagName").when(outputContext).getDAGName();
+    doReturn("destinationVertexName").when(outputContext).getDestinationVertexName();
+    doReturn(1).when(outputContext).getOutputIndex();
+    doReturn(1).when(outputContext).getTaskAttemptNumber();
+    doReturn(1).when(outputContext).getTaskIndex();
+    doReturn(1).when(outputContext).getTaskVertexIndex();
+    doReturn("vertexName").when(outputContext).getTaskVertexName();
+    doReturn(uniqueId).when(outputContext).getUniqueIdentifier();
+    Path outDirBase = new Path(workDir, "outDir_" + uniqueId);
+    String[] outDirs = new String[] { outDirBase.toString() };
+    doReturn(outDirs).when(outputContext).getWorkDirs();
+    return outputContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index 16dca55..b6e3604 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -75,7 +75,7 @@ public class TestDefaultSorter {
 
     conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT, 0.0f);
     try {
-      new DefaultSorter(context, conf, 10, 2048);
+      new DefaultSorter(context, conf, 10, (10 * 1024 * 1024l));
       fail();
     } catch(IllegalArgumentException e) {
       assertTrue(e.getMessage().contains(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT));
@@ -83,7 +83,7 @@ public class TestDefaultSorter {
 
     conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT, 1.1f);
     try {
-      new DefaultSorter(context, conf, 10, 2048);
+      new DefaultSorter(context, conf, 10, (10 * 1024 * 1024l));
       fail();
     } catch(IllegalArgumentException e) {
       assertTrue(e.getMessage().contains(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT));

http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 0d1a9c0..b9ff7ef 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -36,6 +36,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.library.api.KeyValuesWriter;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
@@ -164,7 +165,9 @@ public class TestOnFileSortedOutput {
     doReturn(payLoad).when(context).getUserPayload();
     sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
     try {
+      //Memory limit checks are done in sorter impls. For e.g, defaultsorter does not support
> 2GB
       sortedOutput.initialize();
+      DefaultSorter sorter = new DefaultSorter(context, conf, 100, 3500*1024*1024l);
       fail();
     } catch(IllegalArgumentException e) {
       assertTrue(e.getMessage().contains(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB));


Mime
View raw message