tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2383. Cleanup input/output/processor contexts in LogicalIOProcessorRuntimeTask (rbalamohan)
Date Thu, 30 Apr 2015 00:18:27 GMT
Repository: tez
Updated Branches:
  refs/heads/master ca83804f9 -> 5f63de8ee


TEZ-2383. Cleanup input/output/processor contexts in LogicalIOProcessorRuntimeTask (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5f63de8e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5f63de8e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5f63de8e

Branch: refs/heads/master
Commit: 5f63de8eecbc8c0f487da122714f0aca1639ac4f
Parents: ca83804
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Thu Apr 30 05:48:00 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Thu Apr 30 05:48:00 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 50 +++++++++-
 .../runtime/api/impl/TezInputContextImpl.java   | 18 +++-
 .../runtime/api/impl/TezOutputContextImpl.java  | 16 +++-
 .../api/impl/TezProcessorContextImpl.java       | 18 +++-
 .../runtime/api/impl/TezTaskContextImpl.java    | 22 +++--
 .../TestLogicalIOProcessorRuntimeTask.java      | 96 ++++++++++++++------
 7 files changed, 176 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16a7e08..3a1867e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2383. Cleanup input/output/processor contexts in LogicalIOProcessorRuntimeTask.
   TEZ-2084. Tez UI: Stacktrace format info is lost in diagnostics
   TEZ-2374. Fix build break against hadoop-2.2 due to TEZ-2325.
   TEZ-2314. Tez task attempt failures due to bad event serialization

http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 80c2717..f465d3c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -37,6 +38,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -77,7 +80,6 @@ import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezInputContextImpl;
 import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl;
 import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
-import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
@@ -97,12 +99,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private final String[] localDirs;
   /** Responsible for maintaining order of Inputs */
   private final List<InputSpec> inputSpecs;
-  private final ConcurrentHashMap<String, LogicalInput> inputsMap;
-  private final ConcurrentHashMap<String, InputContext> inputContextMap;
+  private final Map<String, LogicalInput> inputsMap;
+  private final Map<String, InputContext> inputContextMap;
   /** Responsible for maintaining order of Outputs */
   private final List<OutputSpec> outputSpecs;
-  private final ConcurrentHashMap<String, LogicalOutput> outputsMap;
-  private final ConcurrentHashMap<String, OutputContext> outputContextMap;
+  private final Map<String, LogicalOutput> outputsMap;
+  private final Map<String, OutputContext> outputContextMap;
 
   private final List<GroupInputSpec> groupInputSpecs;
   private ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
@@ -692,7 +694,45 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     eventRouterThread.start();
   }
 
+  private void cleanupInputOutputs() {
+    if (groupInputsMap != null) {
+      groupInputsMap.clear();
+    }
+    inputsMap.clear();
+    outputsMap.clear();
+  }
+
+  private void closeContexts() throws IOException {
+    closeContext(inputContextMap);
+    closeContext(outputContextMap);
+    closeContext(processorContext);
+  }
+
+  private void closeContext(Map<String, ? extends TaskContext> contextMap) throws IOException
{
+    if (contextMap == null) {
+      return;
+    }
+
+    for(TaskContext context : contextMap.values()) {
+      closeContext(context);
+    }
+    contextMap.clear();
+  }
+
+  private void closeContext(TaskContext context) throws IOException {
+    if (context != null && (context instanceof Closeable)) {
+      ((Closeable) context).close();
+    }
+  }
+
   public synchronized void cleanup() {
+    try {
+      cleanupInputOutputs();
+      closeContexts();
+    } catch (IOException e) {
+      LOG.info("Error while cleaning up contexts ", e);
+    }
+
     LOG.info("Final Counters : " + getCounters().toShortString());
     setTaskDone();
     if (eventRouterThread != null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index f6330f3..101aeb9 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -46,16 +47,20 @@ import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TezInputContextImpl extends TezTaskContextImpl
     implements InputContext {
 
-  private final UserPayload userPayload;
+  private static final Logger LOG = LoggerFactory.getLogger(TezInputContextImpl.class);
+
+  private UserPayload userPayload;
   private final String sourceVertexName;
   private final EventMetaData sourceInfo;
   private final int inputIndex;
   private final Map<String, LogicalInput> inputs;
-  private final InputReadyTracker inputReadyTracker;
+  private InputReadyTracker inputReadyTracker;
   private final InputStatisticsReporterImpl statsReporter;
   
   class InputStatisticsReporterImpl implements InputStatisticsReporter {
@@ -161,4 +166,13 @@ public class TezInputContextImpl extends TezTaskContextImpl
   public InputStatisticsReporter getStatisticsReporter() {
     return statsReporter;
   }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    this.userPayload = null;
+    this.inputReadyTracker = null;
+    inputs.clear();
+    LOG.info("Cleared TezInputContextImpl related information");
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 4045113..b46cfd2 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -44,16 +45,20 @@ import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.OutputStatisticsReporter;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TezOutputContextImpl extends TezTaskContextImpl
     implements OutputContext {
 
-  private final UserPayload userPayload;
+  private static final Logger LOG = LoggerFactory.getLogger(TezOutputContextImpl.class);
+
+  private UserPayload userPayload;
   private final String destinationVertexName;
   private final EventMetaData sourceInfo;
   private final int outputIndex;
   private final OutputStatisticsReporterImpl statsReporter;
-  
+
   class OutputStatisticsReporterImpl implements OutputStatisticsReporter {
 
     @Override
@@ -146,4 +151,11 @@ public class TezOutputContextImpl extends TezTaskContextImpl
   public OutputStatisticsReporter getStatisticsReporter() {
     return statsReporter;
   }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    this.userPayload = null;
+    LOG.info("Cleared TezOutputContextImpl related information");
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index a74ccac..d6b3ec5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -45,12 +46,16 @@ import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TezProcessorContextImpl extends TezTaskContextImpl implements ProcessorContext
{
 
-  private final UserPayload userPayload;
+  private static final Logger LOG = LoggerFactory.getLogger(TezProcessorContextImpl.class);
+
+  private UserPayload userPayload;
+  private InputReadyTracker inputReadyTracker;
   private final EventMetaData sourceInfo;
-  private final InputReadyTracker inputReadyTracker;
 
   public TezProcessorContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,
       TezUmbilical tezUmbilical, String dagName, String vertexName,
@@ -110,4 +115,13 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements
Proce
   public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException
{
     inputReadyTracker.waitForAllInputsReady(inputs);
   }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    this.userPayload = null;
+    this.inputReadyTracker = null;
+    LOG.info("Cleared TezProcessorContextImpl related information");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index a156f54..170741a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -20,6 +20,8 @@ package org.apache.tez.runtime.api.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Map;
@@ -35,7 +37,6 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EntityDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.ObjectRegistry;
@@ -44,25 +45,24 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
 import com.google.common.base.Preconditions;
 
-public abstract class TezTaskContextImpl implements TaskContext {
+public abstract class TezTaskContextImpl implements TaskContext, Closeable {
 
   private static final AtomicInteger ID_GEN = new AtomicInteger(10000);
-  
-  private final Configuration conf;
+
   protected final String taskVertexName;
   protected final TezTaskAttemptID taskAttemptID;
   private final TezCounters counters;
   private String[] workDirs;
   private String uniqueIdentifier;
-  protected final LogicalIOProcessorRuntimeTask runtimeTask;
+  protected LogicalIOProcessorRuntimeTask runtimeTask;
   protected final TezUmbilical tezUmbilical;
   private final Map<String, ByteBuffer> serviceConsumerMetadata;
   private final int appAttemptNumber;
   private final Map<String, String> auxServiceEnv;
-  protected final MemoryDistributor initialMemoryDistributor;
+  protected MemoryDistributor initialMemoryDistributor;
   protected final EntityDescriptor<?> descriptor;
   private final String dagName;
-  private final ObjectRegistry objectRegistry;
+  private ObjectRegistry objectRegistry;
   private final int vertexParallelism;
   private final ExecutionContext ExecutionContext;
   private final long memAvailable;
@@ -84,7 +84,6 @@ public abstract class TezTaskContextImpl implements TaskContext {
     checkNotNull(auxServiceEnv, "auxServiceEnv is null");
     checkNotNull(memDist, "memDist is null");
     checkNotNull(descriptor, "descriptor is null");
-    this.conf = conf;
     this.dagName = dagName;
     this.taskVertexName = taskVertexName;
     this.taskAttemptID = taskAttemptID;
@@ -223,4 +222,11 @@ public abstract class TezTaskContextImpl implements TaskContext {
   private int generateId() {
     return ID_GEN.incrementAndGet();
   }
+
+  @Override
+  public void close() throws IOException {
+    this.runtimeTask = null;
+    this.objectRegistry = null;
+    this.initialMemoryDistributor = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 4d165b5..df932cf 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -19,9 +19,12 @@
 package org.apache.tez.runtime;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +41,7 @@ import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.Reader;
@@ -80,38 +84,78 @@ public class TestLogicalIOProcessorRuntimeTask {
         umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap,
null,
         "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
 
-    lio1.initialize();
-    lio1.run();
-    lio1.close();
+    try {
+      lio1.initialize();
+      lio1.run();
+      lio1.close();
+
+      // Input should've been started, Output should not have been started
+      assertEquals(1, TestProcessor.runCount);
+      assertEquals(1, TestInput.startCount);
+      assertEquals(0, TestOutput.startCount);
+      assertEquals(30, TestInput.vertexParallelism);
+      assertEquals(0, TestOutput.vertexParallelism);
+      assertEquals(30, lio1.getProcessorContext().getVertexParallelism());
+      assertEquals(30, lio1.getInputContexts().iterator().next().getVertexParallelism());
+      assertEquals(30, lio1.getOutputContexts().iterator().next().getVertexParallelism());
+    } catch(Exception e) {
+      fail();
+    } finally {
+      cleanupAndTest(lio1);
+    }
+
+
 
-    // Input should've been started, Output should not have been started
-    assertEquals(1, TestProcessor.runCount);
-    assertEquals(1, TestInput.startCount);
-    assertEquals(0, TestOutput.startCount);
-    assertEquals(30, TestInput.vertexParallelism);
-    assertEquals(0, TestOutput.vertexParallelism);
-    assertEquals(30, lio1.getProcessorContext().getVertexParallelism());
-    assertEquals(30, lio1.getInputContexts().iterator().next().getVertexParallelism());
-    assertEquals(30, lio1.getOutputContexts().iterator().next().getVertexParallelism());
 
     LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf,
null,
         umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap,
null,
         "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
-    lio2.initialize();
-    lio2.run();
-    lio2.close();
-
-    // Input should not have been started again, Output should not have been started
-    assertEquals(2, TestProcessor.runCount);
-    assertEquals(1, TestInput.startCount);
-    assertEquals(0, TestOutput.startCount);
-    assertEquals(30, TestInput.vertexParallelism);
-    assertEquals(0, TestOutput.vertexParallelism);
-    //Check if parallelism is available in processor/ i/p / o/p contexts
-    assertEquals(10, lio2.getProcessorContext().getVertexParallelism());
-    assertEquals(10, lio2.getInputContexts().iterator().next().getVertexParallelism());
-    assertEquals(10, lio2.getOutputContexts().iterator().next().getVertexParallelism());
+    try {
+      lio2.initialize();
+      lio2.run();
+      lio2.close();
+
+      // Input should not have been started again, Output should not have been started
+      assertEquals(2, TestProcessor.runCount);
+      assertEquals(1, TestInput.startCount);
+      assertEquals(0, TestOutput.startCount);
+      assertEquals(30, TestInput.vertexParallelism);
+      assertEquals(0, TestOutput.vertexParallelism);
+      //Check if parallelism is available in processor/ i/p / o/p contexts
+      assertEquals(10, lio2.getProcessorContext().getVertexParallelism());
+      assertEquals(10, lio2.getInputContexts().iterator().next().getVertexParallelism());
+      assertEquals(10, lio2.getOutputContexts().iterator().next().getVertexParallelism());
+    } catch(Exception e) {
+      fail();
+    } finally {
+      cleanupAndTest(lio2);
+    }
+
+  }
+
+  private void cleanupAndTest(LogicalIOProcessorRuntimeTask lio) {
+
+    lio.cleanup();
+
+    assertTrue(lio.getProcessorContext().getUserPayload() == null);
+    assertTrue(lio.getProcessorContext().getObjectRegistry() == null);
+
+    try {
+      lio.getProcessorContext().waitForAnyInputReady(Collections.<Input>emptyList());
+      fail("Processor context should have been already cleanup");
+    } catch (Throwable t) {
+      assertTrue(t instanceof NullPointerException);
+    }
+
+    try {
+      lio.getProcessorContext().requestInitialMemory(0, null);
+      fail("Processor context should have been already cleanup");
+    } catch (Throwable t) {
+      assertTrue(t instanceof NullPointerException);
+    }
 
+    assertTrue(lio.getInputContexts().size() == 0);
+    assertTrue(lio.getOutputContexts().size() == 0);
   }
 
   private TaskSpec createTaskSpec(TezTaskAttemptID taskAttemptID,


Mime
View raw message