kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5660) Don't throw TopologyBuilderException during runtime
Date Wed, 07 Mar 2018 01:50:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16388870#comment-16388870
] 

ASF GitHub Bot commented on KAFKA-5660:
---------------------------------------

guozhangwang closed pull request #4645: KAFKA-5660 Don't throw TopologyBuilderException during
runtime
URL: https://github.com/apache/kafka/pull/4645
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 317581a6beb..42d3d70e396 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -54,13 +55,12 @@ public RecordCollector recordCollector() {
     }
 
     /**
-     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if an attempt is
made to access this state store from an unknown node
+     * @throws StreamsException if an attempt is made to access this state store from an
unknown node
      */
-    @SuppressWarnings("deprecation")
     @Override
     public StateStore getStateStore(final String name) {
         if (currentNode() == null) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException("Accessing
from an unknown node");
+            throw new StreamsException("Accessing from an unknown node");
         }
 
         final StateStore global = stateManager.getGlobalStore(name);
@@ -69,7 +69,7 @@ public StateStore getStateStore(final String name) {
         }
 
         if (!currentNode().stateStores.contains(name)) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException("Processor
" + currentNode().name() + " has no access to StateStore " + name);
+            throw new StreamsException("Processor " + currentNode().name() + " has no access
to StateStore " + name);
         }
 
         return stateManager.getStore(name);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 9aa0e94c8c1..d4e7a5d5c62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -28,6 +28,7 @@
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
@@ -644,7 +645,7 @@ private void prepareTopic(final Map<String, InternalTopicMetadata>
topicPartitio
                 continue;
             }
             if (numPartitions < 0) {
-                throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic
[%s] number of partitions not defined", logPrefix, topic.name()));
+                throw new StreamsException(String.format("%sTopic [%s] number of partitions
not defined", logPrefix, topic.name()));
             }
 
             topic.setNumberOfPartitions(numPartitions);
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 992ffd818ff..68340910c2b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -252,7 +251,7 @@ public void shouldNotAllowToAddStoreWithSameName() {
         } catch (final TopologyException expected) { }
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test
     public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
@@ -276,15 +275,12 @@ public void shouldThrowOnUnassignedStateStoreAccess() throws Exception
{
 
         try {
             new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder);
+            fail("Should have thrown StreamsException");
         } catch (final StreamsException e) {
-            final Throwable cause = e.getCause();
-            if (cause != null
-                && cause instanceof TopologyBuilderException
-                && cause.getMessage().equals("Invalid topology building: Processor
" + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME))
{
-                throw (TopologyBuilderException) cause;
-            } else {
-                throw new RuntimeException("Did expect different exception. Did catch:",
e);
-            }
+            final String error = e.toString();
+            final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException:
failed to initialize processor " + badNodeName;
+            
+            assertThat(error, equalTo(expectedMessage));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 7a815944ecd..f67b6341f8d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -51,6 +51,7 @@
 
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -590,8 +591,7 @@ public void shouldAddInternalTopicConfigForRepartitionTopics() {
         assertEquals("appId-foo", topicConfig.name());
     }
 
-
-    @Test(expected = TopologyBuilderException.class)
+    @Test
     public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
@@ -603,27 +603,24 @@ public void shouldThroughOnUnassignedStateStoreAccess() throws Exception
{
         config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         final StreamsConfig streamsConfig = new StreamsConfig(config);
 
-        try {
-            final TopologyBuilder builder = new TopologyBuilder();
-            builder
-                .addSource(sourceNodeName, "topic")
-                .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource(sourceNodeName, "topic")
+                .addProcessor(goodNodeName, new LocalMockProcessorSupplier(),
+                        sourceNodeName)
                 .addStateStore(
-                    Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
-                    goodNodeName)
-                .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-
+                        Stores.create(LocalMockProcessorSupplier.STORE_NAME)
+                                .withStringKeys().withStringValues().inMemory()
+                                .build(), goodNodeName)
+                .addProcessor(badNodeName, new LocalMockProcessorSupplier(),
+                        sourceNodeName);     
+        try {
             final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig,
builder.internalTopologyBuilder);
-            driver.process("topic", null, null);
+            fail("Should have thrown StreamsException");
         } catch (final StreamsException e) {
-            final Throwable cause = e.getCause();
-            if (cause != null
-                && cause instanceof TopologyBuilderException
-                && cause.getMessage().equals("Invalid topology building: Processor
" + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME))
{
-                throw (TopologyBuilderException) cause;
-            } else {
-                throw new RuntimeException("Did expect different exception. Did catch:",
e);
-            }
+            final String error = e.toString();
+            final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException:
failed to initialize processor " + badNodeName;
+
+            assertThat(error, equalTo(expectedMessage));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index a39e545f513..901fc4b7e0a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -23,7 +23,6 @@
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -53,7 +52,9 @@
 
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -578,17 +579,15 @@ public void shouldThrowOnUnassignedStateStoreAccess() {
             Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
             goodNodeName);
         builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-
+        
         try {
             new ProcessorTopologyTestDriver(streamsConfig, builder);
             fail("Should have throw StreamsException");
-        } catch (final StreamsException expected) {
-            final Throwable cause = expected.getCause();
-            if (cause == null
-                || !(cause instanceof TopologyBuilderException)
-                || !cause.getMessage().equals("Invalid topology building: Processor " + badNodeName
+ " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
-                throw new RuntimeException("Did expect different exception. Did catch:",
expected);
-            }
+        } catch (final StreamsException e) {
+            final String error = e.toString();
+            final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException:
failed to initialize processor " + badNodeName;
+            
+            assertThat(error, equalTo(expectedMessage));
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Don't throw TopologyBuilderException during runtime
> ---------------------------------------------------
>
>                 Key: KAFKA-5660
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5660
>             Project: Kafka
>          Issue Type: Task
>          Components: streams
>    Affects Versions: 0.11.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Nick Afshartous
>            Priority: Major
>             Fix For: 1.2.0
>
>
> {{TopologyBuilderException}} is a pre-runtime exception that should only be thrown beforeĀ {{KafkaStreams#start()}}
is called.
> However, we do throw {{TopologyBuilderException}} within
>  - `SourceNodeFactory#getTopics`
>  - `ProcessorContextImpl#getStateStore`
>  - `StreamPartitionAssignor#prepareTopic `
> (and maybe somewhere else: we should double check if there are other places in the code
like those).
> We should replace those exception with either {{StreamsException}} or with a new exception
type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message