Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7C08F17485 for ; Fri, 8 May 2015 18:08:32 +0000 (UTC) Received: (qmail 11051 invoked by uid 500); 8 May 2015 18:08:32 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 11012 invoked by uid 500); 8 May 2015 18:08:32 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 10999 invoked by uid 99); 8 May 2015 18:08:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 May 2015 18:08:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 347C3E022E; Fri, 8 May 2015 18:08:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Message-Id: <5bd0d089f8dc4068b253717b94c8d3dc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts. (sseth) Date: Fri, 8 May 2015 18:08:32 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 6e6ad706f -> ce69aa1e2 TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ce69aa1e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ce69aa1e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ce69aa1e Branch: refs/heads/master Commit: ce69aa1e2ca3320d33c833a96a158f94bfd73f52 Parents: 6e6ad70 Author: Siddharth Seth Authored: Fri May 8 11:08:14 2015 -0700 Committer: Siddharth Seth Committed: Fri May 8 11:08:14 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../runtime/LogicalIOProcessorRuntimeTask.java | 110 +++++++++++++------ .../runtime/api/impl/TezInputContextImpl.java | 11 +- .../runtime/api/impl/TezOutputContextImpl.java | 2 +- .../api/impl/TezProcessorContextImpl.java | 4 +- .../runtime/api/impl/TezTaskContextImpl.java | 9 +- .../apache/tez/runtime/task/TaskReporter.java | 47 +++++--- .../TestLogicalIOProcessorRuntimeTask.java | 48 +++++--- 8 files changed, 160 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 185e1b0..efb19b2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES Default max limit increased. Should not affect existing users. ALL CHANGES: + TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts. TEZ-2412. Should kill vertex in DAGImpl#VertexRerunWhileCommitting TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly TEZ-776. Reduce AM mem usage caused by storing TezEvents http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index f465d3c..1cfe538 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; @@ -96,45 +97,46 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private static final Logger LOG = LoggerFactory .getLogger(LogicalIOProcessorRuntimeTask.class); + @VisibleForTesting // All fields non private for testing. private final String[] localDirs; /** Responsible for maintaining order of Inputs */ - private final List inputSpecs; - private final Map inputsMap; - private final Map inputContextMap; + final List inputSpecs; + final ConcurrentMap inputsMap; + final ConcurrentMap inputContextMap; /** Responsible for maintaining order of Outputs */ - private final List outputSpecs; - private final Map outputsMap; - private final Map outputContextMap; + final List outputSpecs; + final ConcurrentMap outputsMap; + final ConcurrentMap outputContextMap; - private final List groupInputSpecs; - private ConcurrentHashMap groupInputsMap; + final List groupInputSpecs; + ConcurrentHashMap groupInputsMap; - private final ProcessorDescriptor processorDescriptor; - private AbstractLogicalIOProcessor processor; - private ProcessorContext processorContext; + final ProcessorDescriptor processorDescriptor; + AbstractLogicalIOProcessor processor; + ProcessorContext processorContext; private final MemoryDistributor initialMemoryDistributor; /** Maps which will be provided to the processor run method */ - private final LinkedHashMap runInputMap; - private final LinkedHashMap runOutputMap; + final LinkedHashMap runInputMap; + final LinkedHashMap runOutputMap; private final Map serviceConsumerMetadata; private final Map envMap; - private final ExecutorService initializerExecutor; + final ExecutorService initializerExecutor; private final CompletionService initializerCompletionService; private final Multimap startedInputsMap; - private LinkedBlockingQueue eventsToBeProcessed; - private Thread eventRouterThread = null; + LinkedBlockingQueue eventsToBeProcessed; + Thread eventRouterThread = null; private final int appAttemptNumber; - private final InputReadyTracker inputReadyTracker; + private volatile InputReadyTracker inputReadyTracker; - private final ObjectRegistry objectRegistry; + private volatile ObjectRegistry objectRegistry; private final ExecutionContext ExecutionContext; private final long memAvailable; @@ -143,6 +145,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { Map serviceConsumerMetadata, Map envMap, Multimap startedInputsMap, ObjectRegistry objectRegistry, String pid, ExecutionContext ExecutionContext, long memAvailable) throws IOException { + // Note: If adding any fields here, make sure they're cleaned up in the cleanupContext method. // TODO Remove jobToken from here post TEZ-421 super(taskSpec, tezConf, tezUmbilical, pid); LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: " @@ -361,6 +364,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { setTaskDone(); if (eventRouterThread != null) { eventRouterThread.interrupt(); + LOG.info("Joining on EventRouter"); + try { + eventRouterThread.join(); + } catch (InterruptedException e) { + LOG.info("Ignoring interrupt while waiting for the router thread to die"); + Thread.currentThread().interrupt(); + } + eventRouterThread = null; } } } @@ -694,14 +705,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { eventRouterThread.start(); } - private void cleanupInputOutputs() { - if (groupInputsMap != null) { - groupInputsMap.clear(); - } - inputsMap.clear(); - outputsMap.clear(); - } - private void closeContexts() throws IOException { closeContext(inputContextMap); closeContext(outputContextMap); @@ -725,19 +728,62 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } } - public synchronized void cleanup() { + public void cleanup() throws InterruptedException { + LOG.info("Final Counters : " + getCounters().toShortString()); + setTaskDone(); + if (eventRouterThread != null) { + eventRouterThread.interrupt(); + LOG.info("Joining on EventRouter"); + try { + eventRouterThread.join(); + } catch (InterruptedException e) { + LOG.info("Ignoring interrupt while waiting for the router thread to die"); + Thread.currentThread().interrupt(); + } + eventRouterThread = null; + } try { - cleanupInputOutputs(); closeContexts(); + // Cleanup references which may be held by misbehaved tasks. + cleanupStructures(); } catch (IOException e) { LOG.info("Error while cleaning up contexts ", e); } + } - LOG.info("Final Counters : " + getCounters().toShortString()); - setTaskDone(); - if (eventRouterThread != null) { - eventRouterThread.interrupt(); + private void cleanupStructures() { + if (initializerExecutor != null && !initializerExecutor.isShutdown()) { + initializerExecutor.shutdownNow(); + } + inputsMap.clear(); + outputsMap.clear(); + + inputSpecs.clear(); + outputSpecs.clear(); + + inputsMap.clear(); + outputsMap.clear(); + + inputContextMap.clear(); + outputContextMap.clear(); + + if (groupInputSpecs != null) { + groupInputSpecs.clear(); } + if (groupInputsMap != null) { + groupInputsMap.clear(); + groupInputsMap = null; + } + + processor = null; + processorContext = null; + + runInputMap.clear(); + runOutputMap.clear(); + + eventsToBeProcessed.clear(); + inputReadyTracker = null; + objectRegistry = null; } @Private http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java index 101aeb9..8d6466a 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java @@ -55,12 +55,12 @@ public class TezInputContextImpl extends TezTaskContextImpl private static final Logger LOG = LoggerFactory.getLogger(TezInputContextImpl.class); - private UserPayload userPayload; + private volatile UserPayload userPayload; private final String sourceVertexName; private final EventMetaData sourceInfo; private final int inputIndex; private final Map inputs; - private InputReadyTracker inputReadyTracker; + private volatile InputReadyTracker inputReadyTracker; private final InputStatisticsReporterImpl statsReporter; class InputStatisticsReporterImpl implements InputStatisticsReporter { @@ -159,7 +159,11 @@ public class TezInputContextImpl extends TezTaskContextImpl @Override public void inputIsReady() { - inputReadyTracker.setInputIsReady(inputs.get(sourceVertexName)); + if (inputReadyTracker != null) { + inputReadyTracker.setInputIsReady(inputs.get(sourceVertexName)); + } else { + LOG.warn("Ignoring Input Ready notification since the Task has already been closed"); + } } @Override @@ -172,7 +176,6 @@ public class TezInputContextImpl extends TezTaskContextImpl super.close(); this.userPayload = null; this.inputReadyTracker = null; - inputs.clear(); LOG.info("Cleared TezInputContextImpl related information"); } } http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java index b46cfd2..71e96db 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java @@ -53,7 +53,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl private static final Logger LOG = LoggerFactory.getLogger(TezOutputContextImpl.class); - private UserPayload userPayload; + private volatile UserPayload userPayload; private final String destinationVertexName; private final EventMetaData sourceInfo; private final int outputIndex; http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index d6b3ec5..a191ae8 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -53,8 +53,8 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce private static final Logger LOG = LoggerFactory.getLogger(TezProcessorContextImpl.class); - private UserPayload userPayload; - private InputReadyTracker inputReadyTracker; + private volatile UserPayload userPayload; + private volatile InputReadyTracker inputReadyTracker; private final EventMetaData sourceInfo; public TezProcessorContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber, http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java index 170741a..5f04c80 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java @@ -54,15 +54,15 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable { private final TezCounters counters; private String[] workDirs; private String uniqueIdentifier; - protected LogicalIOProcessorRuntimeTask runtimeTask; + protected final LogicalIOProcessorRuntimeTask runtimeTask; protected final TezUmbilical tezUmbilical; private final Map serviceConsumerMetadata; private final int appAttemptNumber; private final Map auxServiceEnv; - protected MemoryDistributor initialMemoryDistributor; + protected volatile MemoryDistributor initialMemoryDistributor; protected final EntityDescriptor descriptor; private final String dagName; - private ObjectRegistry objectRegistry; + private volatile ObjectRegistry objectRegistry; private final int vertexParallelism; private final ExecutionContext ExecutionContext; private final long memAvailable; @@ -225,7 +225,8 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable { @Override public void close() throws IOException { - this.runtimeTask = null; + Preconditions.checkState(runtimeTask.isTaskDone(), + "Runtime task must be complete before calling cleanup"); this.objectRegistry = null; this.initialMemoryDistributor = null; } http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index 3d1d1a2..8b9db16 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -112,6 +113,7 @@ public class TaskReporter { public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) { currentCallable.markComplete(); currentCallable = null; + // KKK Make sure the callable completes before proceeding } public void shutdown() { @@ -125,7 +127,7 @@ public class TaskReporter { private static final float LOG_COUNTER_BACKOFF = 1.3f; private final RuntimeTask task; - private EventMetaData updateEventMetadata; + private final EventMetaData updateEventMetadata; private final TezTaskUmbilicalProtocol umbilical; @@ -136,6 +138,9 @@ public class TaskReporter { private final AtomicLong requestCounter; + private final AtomicBoolean finalEventQueued = new AtomicBoolean(false); + private final AtomicBoolean askedToDie = new AtomicBoolean(false); + private LinkedBlockingQueue eventsToSend = new LinkedBlockingQueue(); private final ReentrantLock lock = new ReentrantLock(); @@ -199,6 +204,9 @@ public class TaskReporter { } int pendingEventCount = eventsToSend.size(); if (pendingEventCount > 0) { + // This is OK because the pending events will be sent via the succeeded/failed messages. + // TaskDone is set before taskSucceeded / taskFailed are sent out - which is what causes the + // thread to exit. LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount); } return true; @@ -256,6 +264,7 @@ public class TaskReporter { if (response.shouldDie()) { LOG.info("Received should die response from AM"); + askedToDie.set(true); return new ResponseWrapper(true, 1); } if (response.getLastRequestId() != requestId) { @@ -268,7 +277,7 @@ public class TaskReporter { int numEventsReceived = 0; if (task.isTaskDone() || task.hadFatalError()) { if (response.getEvents() != null && !response.getEvents().isEmpty()) { - LOG.warn("Current task already complete, Ignoring all event in" + LOG.info("Current task already complete, Ignoring all event in" + " heartbeat response, eventCount=" + response.getEvents().size()); } } else { @@ -315,10 +324,16 @@ public class TaskReporter { * indicates an exception somewhere in the AM. */ private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { - TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); - TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(), - updateEventMetadata); - return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie; + // Ensure only one final event is ever sent. + if (!finalEventQueued.getAndSet(true)) { + TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); + TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(), + updateEventMetadata); + return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie; + } else { + LOG.warn("A final task state event has already been sent. Not sending again"); + return askedToDie.get(); + } } @VisibleForTesting @@ -351,15 +366,21 @@ public class TaskReporter { */ private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException { - TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); - if (diagnostics == null) { - diagnostics = ExceptionUtils.getStackTrace(t); + // Ensure only one final event is ever sent. + if (!finalEventQueued.getAndSet(true)) { + TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); + if (diagnostics == null) { + diagnostics = ExceptionUtils.getStackTrace(t); + } else { + diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t); + } + TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics), + srcMeta == null ? updateEventMetadata : srcMeta); + return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie; } else { - diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t); + LOG.warn("A final task state event has already been sent. Not sending again"); + return askedToDie.get(); } - TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics), - srcMeta == null ? updateEventMetadata : srcMeta); - return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie; } private void addEvents(TezTaskAttemptID taskAttemptID, Collection events) { http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java index df932cf..b337bc7 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -26,6 +27,7 @@ import static org.mockito.Mockito.mock; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -33,6 +35,7 @@ import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -133,29 +136,43 @@ public class TestLogicalIOProcessorRuntimeTask { } - private void cleanupAndTest(LogicalIOProcessorRuntimeTask lio) { + private void cleanupAndTest(LogicalIOProcessorRuntimeTask lio) throws InterruptedException { + + ProcessorContext procContext = lio.getProcessorContext(); + List inputContexts = new LinkedList(); + inputContexts.addAll(lio.getInputContexts()); + List outputContexts = new LinkedList(); + outputContexts.addAll(lio.getOutputContexts()); lio.cleanup(); - assertTrue(lio.getProcessorContext().getUserPayload() == null); - assertTrue(lio.getProcessorContext().getObjectRegistry() == null); + assertTrue(procContext.getUserPayload() == null); + assertTrue(procContext.getObjectRegistry() == null); - try { - lio.getProcessorContext().waitForAnyInputReady(Collections.emptyList()); - fail("Processor context should have been already cleanup"); - } catch (Throwable t) { - assertTrue(t instanceof NullPointerException); + for (InputContext inputContext : inputContexts) { + assertTrue(inputContext.getUserPayload() == null); + assertTrue(inputContext.getObjectRegistry() == null); } - try { - lio.getProcessorContext().requestInitialMemory(0, null); - fail("Processor context should have been already cleanup"); - } catch (Throwable t) { - assertTrue(t instanceof NullPointerException); + for (OutputContext outputContext : outputContexts) { + assertTrue(outputContext.getUserPayload() == null); + assertTrue(outputContext.getObjectRegistry() == null); } - assertTrue(lio.getInputContexts().size() == 0); - assertTrue(lio.getOutputContexts().size() == 0); + assertEquals(0, lio.inputSpecs.size()); + assertEquals(0, lio.inputsMap.size()); + assertEquals(0, lio.inputContextMap.size()); + assertEquals(0, lio.outputSpecs.size()); + assertEquals(0, lio.outputsMap.size()); + assertEquals(0, lio.outputContextMap.size()); + assertTrue(lio.groupInputSpecs == null || lio.groupInputSpecs.size() == 0); + assertNull(lio.groupInputsMap); + assertNull(lio.processor); + assertNull(lio.processorContext); + assertEquals(0, lio.runInputMap.size()); + assertEquals(0, lio.runOutputMap.size()); + assertEquals(0, lio.eventsToBeProcessed.size()); + assertNull(lio.eventRouterThread); } private TaskSpec createTaskSpec(TezTaskAttemptID taskAttemptID, @@ -248,7 +265,6 @@ public class TestLogicalIOProcessorRuntimeTask { public void start() throws Exception { startCount++; this.vertexParallelism = getContext().getVertexParallelism(); - System.err.println("In started"); } @Override