kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called before StreamThread.start
Date Tue, 02 Jan 2018 22:25:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308810#comment-16308810
] 

ASF GitHub Bot commented on KAFKA-6383:
---------------------------------------

rodesai closed pull request #4378: Revert "KAFKA-6383: complete shutdown for CREATED StreamThreads
(#4343)"
URL: https://github.com/apache/kafka/pull/4378
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ff440cc49a3..696081d36c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -173,26 +173,23 @@ public State state() {
     /**
      * Sets the state
      * @param newState New state
-     * @return The state prior to the call to setState, or null if the transition is invalid
      */
-    State setState(final State newState) {
-        final State oldState;
+    boolean setState(final State newState) {
+        final State oldState = state;
 
         synchronized (stateLock) {
-            oldState = state;
-
             if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
                 // when the state is already in PENDING_SHUTDOWN, all other transitions will
be
                 // refused but we do not throw exception here
-                return null;
+                return false;
             } else if (state == State.DEAD) {
                 // when the state is already in NOT_RUNNING, all its transitions
                 // will be refused but we do not throw exception here
-                return null;
+                return false;
             } else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED)
{
                 // when the state is already in PARTITIONS_REVOKED, its transition to itself
will be
                 // refused but we do not throw exception here
-                return null;
+                return false;
             } else if (!state.isValidTransition(newState)) {
                 log.error("Unexpected state transition from {} to {}", oldState, newState);
                 throw new StreamsException(logPrefix + "Unexpected state transition from
" + oldState + " to " + newState);
@@ -212,7 +209,7 @@ State setState(final State newState) {
             stateListener.onChange(this, state, oldState);
         }
 
-        return oldState;
+        return true;
     }
 
     public boolean isRunningAndNotRebalancing() {
@@ -254,7 +251,7 @@ public void onPartitionsAssigned(final Collection<TopicPartition>
assignment) {
 
             final long start = time.milliseconds();
             try {
-                if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
+                if (!streamThread.setState(State.PARTITIONS_ASSIGNED)) {
                     return;
                 }
                 taskManager.createTasks(assignment);
@@ -284,7 +281,7 @@ public void onPartitionsRevoked(final Collection<TopicPartition>
assignment) {
                 taskManager.activeTaskIds(),
                 taskManager.standbyTaskIds());
 
-            if (streamThread.setState(State.PARTITIONS_REVOKED) != null) {
+            if (streamThread.setState(State.PARTITIONS_REVOKED)) {
                 final long start = time.milliseconds();
                 try {
                     // suspend active tasks
@@ -717,11 +714,7 @@ public StreamThread(final Time time,
     @Override
     public void run() {
         log.info("Starting");
-        if (setState(State.RUNNING) == null) {
-            log.info("StreamThread already shutdown. Not running");
-            completeShutdown(true);
-            return;
-        }
+        setState(State.RUNNING);
         boolean cleanRun = false;
         try {
             runLoop();
@@ -1095,11 +1088,7 @@ private long computeLatency() {
      */
     public void shutdown() {
         log.info("Informed to shut down");
-        State oldState = setState(State.PENDING_SHUTDOWN);
-        if (oldState == State.CREATED) {
-            // Start so that we shutdown on the thread
-            this.start();
-        }
+        setState(State.PENDING_SHUTDOWN);
     }
 
     private void completeShutdown(final boolean cleanRun) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index bdc1c00153f..d70c8f393e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -274,11 +274,7 @@ void shutdown(final boolean clean) {
         standby.close(clean);
 
         // remove the changelog partitions from restore consumer
-        try {
-            restoreConsumer.unsubscribe();
-        } catch (final RuntimeException fatalException) {
-            firstException.compareAndSet(null, fatalException);
-        }
+        restoreConsumer.unsubscribe();
         taskCreator.close();
         standbyTaskCreator.close();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index a2084b02aea..8746c62a50f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams;
 
-import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.ConfigException;
@@ -32,7 +31,6 @@
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestCondition;
@@ -115,28 +113,6 @@ public void testStateCloseAfterCreate() {
         Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
     }
 
-    @Test
-    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
-        final StreamsBuilder builder = new StreamsBuilder();
-        builder.globalTable("anyTopic");
-        MockClientSupplier clientSupplier = new MockClientSupplier();
-        final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(props),
clientSupplier);
-        streams.close();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return streams.state() == KafkaStreams.State.NOT_RUNNING;
-            }
-        }, 10 * 1000, "Streams never stopped.");
-
-        // Ensure that any created clients are closed
-        assertTrue(clientSupplier.consumer.closed());
-        assertTrue(clientSupplier.restoreConsumer.closed());
-        for (MockProducer p : clientSupplier.producers) {
-            assertTrue(p.closed());
-        }
-    }
-
     @Test
     public void testStateThreadClose() throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index cca7045b19a..42504655117 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -441,13 +441,8 @@ public void shouldShutdownTaskManagerOnClose() throws InterruptedException
{
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
-                metrics,
-                "",
-                "",
-                Collections.<String, String>emptyMap());
-        final StreamThread thread = new StreamThread(
-                mockTime,
+        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics,
"", "", Collections.<String, String>emptyMap());
+        final StreamThread thread = new StreamThread(mockTime,
                 config,
                 consumer,
                 consumer,
@@ -463,40 +458,6 @@ public void shouldShutdownTaskManagerOnClose() throws InterruptedException
{
         EasyMock.verify(taskManager);
     }
 
-    @Test
-    public void shouldShutdownTaskManagerOnCloseWithoutStart() {
-        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
-        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
-        taskManager.shutdown(true);
-        EasyMock.expectLastCall();
-        EasyMock.replay(taskManager, consumer);
-
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
-                metrics,
-                "",
-                "",
-                Collections.<String, String>emptyMap());
-        final StreamThread thread = new StreamThread(
-                mockTime,
-                config,
-                consumer,
-                consumer,
-                null,
-                taskManager,
-                streamsMetrics,
-                internalTopologyBuilder,
-                clientId,
-                new LogContext(""));
-        thread.shutdown();
-        try {
-            thread.join(1000);
-        } catch (final InterruptedException e) {
-            fail("Join interrupted");
-        }
-        assertFalse(thread.isAlive());
-        EasyMock.verify(taskManager);
-    }
-
     @Test
     public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology()
throws InterruptedException {
         internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> StreamThread.shutdown doesn't clean up completely when called before StreamThread.start
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6383
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6383
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Rohan Desai
>            Assignee: Rohan Desai
>             Fix For: 1.1.0
>
>
> The following code leaks a producer network thread:
> {code}
> ks = new KafkaStreams(...);
> ks.close();
> {code}
> The underlying issue is that KafkaStreams creates a bunch of StreamsThreads via StreamThread.create,
which in turn creates a bunch of stuff (including a producer). These resources are cleaned
up only when the thread exits. So if the thread was never started, then they are never cleaned
up. StreamThread.shutdown should clean up if it sees that the thread has never been started.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message