tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kshu...@apache.org
Subject tez git commit: TEZ-3741. Tez outputs should free memory when closed (Jason Lowe via kshukla)
Date Mon, 19 Jun 2017 13:41:17 GMT
Repository: tez
Updated Branches:
  refs/heads/master a70e16326 -> f0a9281ca


TEZ-3741. Tez outputs should free memory when closed (Jason Lowe via kshukla)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f0a9281c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f0a9281c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f0a9281c

Branch: refs/heads/master
Commit: f0a9281caf29c763c77bf784e9095140e7ca0c8a
Parents: a70e163
Author: Kuhu Shukla <kshukla@yahoo-inc.com>
Authored: Mon Jun 19 08:38:27 2017 -0500
Committer: Kuhu Shukla <kshukla@yahoo-inc.com>
Committed: Mon Jun 19 08:38:27 2017 -0500

----------------------------------------------------------------------
 .../common/sort/impl/dflt/DefaultSorter.java    | 14 +++++-
 .../output/OrderedPartitionedKVOutput.java      |  1 +
 .../library/output/UnorderedKVOutput.java       |  1 +
 .../output/UnorderedPartitionedKVOutput.java    |  1 +
 .../sort/impl/dflt/TestDefaultSorter.java       |  2 +
 .../library/output/OutputTestHelpers.java       | 34 +++++++++++++-
 .../output/TestOrderedPartitionedKVOutput2.java | 43 +++++++++++++++++-
 .../library/output/TestUnorderedKVOutput2.java  | 47 ++++++++++++++++++++
 8 files changed, 138 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index a6f7cf7..1528076 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -78,7 +78,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
   private final static int APPROX_HEADER_LENGTH = 150;
 
   // k/v accounting
-  private final IntBuffer kvmeta; // metadata overlay on backing store
+  private IntBuffer kvmeta; // metadata overlay on backing store
   int kvstart;            // marks origin of spill metadata
   int kvend;              // marks end of spill metadata
   int kvindex;            // marks end of fully serialized records
@@ -91,7 +91,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
   int bufvoid;            // marks the point where we should stop
                           // reading at the end of the buffer
 
-  private final byte[] kvbuffer;        // main output buffer
+  private byte[] kvbuffer;        // main output buffer
   private final byte[] b0 = new byte[0];
 
   protected static final int VALSTART = 0;         // val offset in acct
@@ -749,6 +749,16 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
     }
   }
 
+  @Override
+  public void close() throws IOException {
+    super.close();
+    kvbuffer = null;
+    kvmeta = null;
+  }
+
+  boolean isClosed() {
+    return kvbuffer == null && kvmeta == null;
+  }
 
   protected class SpillThread extends Thread {
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 98e14be..7d3e0b4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -187,6 +187,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput
{
       sorter.close();
       this.endTime = System.nanoTime();
       returnEvents = generateEvents();
+      sorter = null;
     } else {
       LOG.warn(getContext().getDestinationVertexName() +
           ": Attempting to close output {} of type {} before it was started. Generating empty
events",

http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 3689e5c..51521e4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -127,6 +127,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
     if (isStarted.get()) {
       //TODO: Do we need to support sending payloads via events?
       returnEvents = kvWriter.close();
+      kvWriter = null;
     } else {
       LOG.warn(getContext().getDestinationVertexName() +
           ": Attempting to close output {} of type {} before it was started. Generating empty
events",

http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index eeca066..e83f1e9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -104,6 +104,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput
{
     List<Event> returnEvents = null;
     if (isStarted.get()) {
       returnEvents = kvWriter.close();
+      kvWriter = null;
     } else {
       LOG.warn(getContext().getDestinationVertexName() +
           ": Attempting to close output {} of type {} before it was started. Generating empty
events",

http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index 73d249c..b3b16d9 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -280,6 +280,7 @@ public class TestDefaultSorter {
     try {
       sorter.flush();
       sorter.close();
+      assertTrue(sorter.isClosed());
       assertTrue(sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase(UniqueID));
       verifyCounters(sorter, context);
     } catch(Exception e) {
@@ -302,6 +303,7 @@ public class TestDefaultSorter {
     try {
       sorter.flush();
       sorter.close();
+      assertTrue(sorter.isClosed());
       assertTrue(sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase(UniqueID
+
           "_0"));
       verifyCounters(sorter, context);

http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
index db9a0ed..573d53e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
@@ -14,20 +14,29 @@
 
 package org.apache.tez.runtime.library.output;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.OutputStatisticsReporter;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
-public class OutputTestHelpers {
+class OutputTestHelpers {
   static OutputContext createOutputContext() throws IOException {
     OutputContext outputContext = mock(OutputContext.class);
     Configuration conf = new TezConfiguration();
@@ -44,4 +53,27 @@ public class OutputTestHelpers {
     doReturn(statsReporter).when(outputContext).getStatisticsReporter();
     return outputContext;
   }
+
+  static OutputContext createOutputContext(Configuration conf, Path workingDir) throws IOException
{
+    OutputContext ctx = mock(OutputContext.class);
+    doAnswer(new Answer<Void>() {
+      @Override public Void answer(InvocationOnMock invocation) throws Throwable {
+        long requestedSize = (Long) invocation.getArguments()[0];
+        MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler) invocation
+            .getArguments()[1];
+        callback.memoryAssigned(requestedSize);
+        return null;
+      }
+    }).when(ctx).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class));
+    doReturn(TezUtils.createUserPayloadFromConf(conf)).when(ctx).getUserPayload();
+    doReturn("destinationVertex").when(ctx).getDestinationVertexName();
+    doReturn("UUID").when(ctx).getUniqueIdentifier();
+    doReturn(new String[] { workingDir.toString() }).when(ctx).getWorkDirs();
+    doReturn(200 * 1024 * 1024l).when(ctx).getTotalMemoryAvailableToTask();
+    doReturn(new TezCounters()).when(ctx).getCounters();
+    OutputStatisticsReporter statsReporter = mock(OutputStatisticsReporter.class);
+    doReturn(statsReporter).when(ctx).getStatisticsReporter();
+    doReturn(new ExecutionContextImpl("localhost")).when(ctx).getExecutionContext();
+    return ctx;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
index 8e76a8b..f226b7c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
@@ -23,22 +23,53 @@ import java.util.BitSet;
 import java.util.List;
 
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 // Tests which don't require parameterization
 public class TestOrderedPartitionedKVOutput2 {
+  private Configuration conf;
+  private FileSystem localFs;
+  private Path workingDir;
 
+  @Before
+  public void setup() throws IOException {
+    conf = new Configuration();
+    localFs = FileSystem.getLocal(conf);
+    workingDir = new Path(System.getProperty("test.build.data",
+        System.getProperty("java.io.tmpdir", "/tmp")),
+        TestUnorderedKVOutput2.class.getName()).makeQualified(
+        localFs.getUri(), localFs.getWorkingDirectory());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+        HashPartitioner.class.getName());
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    localFs.delete(workingDir, true);
+  }
 
   @Test(timeout = 5000)
   public void testNonStartedOutput() throws IOException {
-    OutputContext outputContext = OutputTestHelpers.createOutputContext();
+    OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
     int numPartitions = 10;
     OrderedPartitionedKVOutput output = new OrderedPartitionedKVOutput(outputContext, numPartitions);
     output.initialize();
@@ -63,5 +94,13 @@ public class TestOrderedPartitionedKVOutput2 {
     }
   }
 
-
+  @Test(timeout = 10000)
+  public void testClose() throws Exception {
+    OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+    int numPartitions = 10;
+    OrderedPartitionedKVOutput output = new OrderedPartitionedKVOutput(outputContext, numPartitions);
+    output.initialize();
+    output.start();
+    output.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
index ecc1241..792b03f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
@@ -15,23 +15,58 @@
 package org.apache.tez.runtime.library.output;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.List;
 
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 // Tests which don't require parameterization
 public class TestUnorderedKVOutput2 {
+  private Configuration conf;
+  private FileSystem localFs;
+  private Path workingDir;
+
+  @Before
+  public void setup() throws IOException {
+    conf = new Configuration();
+    localFs = FileSystem.getLocal(conf);
+    workingDir = new Path(System.getProperty("test.build.data",
+        System.getProperty("java.io.tmpdir", "/tmp")),
+        TestUnorderedKVOutput2.class.getName()).makeQualified(
+            localFs.getUri(), localFs.getWorkingDirectory());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+        HashPartitioner.class.getName());
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    localFs.delete(workingDir, true);
+  }
 
   @Test(timeout = 5000)
   public void testNonStartedOutput() throws Exception {
@@ -57,4 +92,16 @@ public class TestUnorderedKVOutput2 {
       assertTrue(emptyPartionsBitSet.get(i));
     }
   }
+
+  @Test(timeout = 10000)
+  public void testClose() throws Exception {
+    OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+    int numPartitions = 1;
+    UnorderedKVOutput output = new UnorderedKVOutput(outputContext, numPartitions);
+    output.initialize();
+    output.start();
+    assertNotNull(output.getWriter());
+    output.close();
+    assertNull(output.getWriter());
+  }
 }


Mime
View raw message