From commits-return-9130-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Mar 7 02:49:39 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 00EE3180652 for ; Wed, 7 Mar 2018 02:49:38 +0100 (CET) Received: (qmail 10473 invoked by uid 500); 7 Mar 2018 01:49:37 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 10464 invoked by uid 99); 7 Mar 2018 01:49:37 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Mar 2018 01:49:37 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 61ACE822B8; Wed, 7 Mar 2018 01:49:37 +0000 (UTC) Date: Wed, 07 Mar 2018 01:49:37 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-5660 Don't throw TopologyBuilderException during runtime (#4645) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152038737690.18416.16785953665572345960@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 23c1c52c85eaca9d51046f5235721a3239c2a229 X-Git-Newrev: cf092aeecc473b70d81c00b604e29de8c9f6d84b X-Git-Rev: cf092aeecc473b70d81c00b604e29de8c9f6d84b X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new cf092ae KAFKA-5660 Don't throw TopologyBuilderException during runtime (#4645) cf092ae is described below commit cf092aeecc473b70d81c00b604e29de8c9f6d84b Author: nafshartous AuthorDate: Tue Mar 6 20:49:33 2018 -0500 KAFKA-5660 Don't throw TopologyBuilderException during runtime (#4645) Reviewers: Matthias J. Sax , Bill Bejeck , Guozhang Wang --- .../processor/internals/ProcessorContextImpl.java | 8 ++--- .../internals/StreamsPartitionAssignor.java | 3 +- .../org/apache/kafka/streams/TopologyTest.java | 16 ++++------ .../streams/processor/TopologyBuilderTest.java | 37 ++++++++++------------ .../internals/InternalTopologyBuilderTest.java | 17 +++++----- 5 files changed, 37 insertions(+), 44 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 317581a..42d3d70 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -54,13 +55,12 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } /** - * @throws org.apache.kafka.streams.errors.TopologyBuilderException if an attempt is made to access this state store from an unknown node + * @throws StreamsException if an attempt is made to access this state store from an unknown node */ - @SuppressWarnings("deprecation") @Override public StateStore getStateStore(final String name) { if (currentNode() == null) { - throw new org.apache.kafka.streams.errors.TopologyBuilderException("Accessing from an unknown node"); + throw new StreamsException("Accessing from an unknown node"); } final StateStore global = stateManager.getGlobalStore(name); @@ -69,7 +69,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } if (!currentNode().stateStores.contains(name)) { - throw new org.apache.kafka.streams.errors.TopologyBuilderException("Processor " + currentNode().name() + " has no access to StateStore " + name); + throw new StreamsException("Processor " + currentNode().name() + " has no access to StateStore " + name); } return stateManager.getStore(name); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 71a84b2..0edbe2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.PartitionGrouper; import org.apache.kafka.streams.processor.TaskId; @@ -694,7 +695,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable continue; } if (numPartitions < 0) { - throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name())); + throw new StreamsException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name())); } topic.setNumberOfPartitions(numPartitions); diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index 992ffd8..6834091 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -252,7 +251,7 @@ public class TopologyTest { } catch (final TopologyException expected) { } } - @Test(expected = TopologyBuilderException.class) + @Test public void shouldThrowOnUnassignedStateStoreAccess() throws Exception { final String sourceNodeName = "source"; final String goodNodeName = "goodGuy"; @@ -276,15 +275,12 @@ public class TopologyTest { try { new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder); + fail("Should have thrown StreamsException"); } catch (final StreamsException e) { - final Throwable cause = e.getCause(); - if (cause != null - && cause instanceof TopologyBuilderException - && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) { - throw (TopologyBuilderException) cause; - } else { - throw new RuntimeException("Did expect different exception. Did catch:", e); - } + final String error = e.toString(); + final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName; + + assertThat(error, equalTo(expectedMessage)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 7a81594..f67b634 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -51,6 +51,7 @@ import java.util.regex.Pattern; import static org.apache.kafka.common.utils.Utils.mkList; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -590,8 +591,7 @@ public class TopologyBuilderTest { assertEquals("appId-foo", topicConfig.name()); } - - @Test(expected = TopologyBuilderException.class) + @Test public void shouldThroughOnUnassignedStateStoreAccess() throws Exception { final String sourceNodeName = "source"; final String goodNodeName = "goodGuy"; @@ -603,27 +603,24 @@ public class TopologyBuilderTest { config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); final StreamsConfig streamsConfig = new StreamsConfig(config); - try { - final TopologyBuilder builder = new TopologyBuilder(); - builder - .addSource(sourceNodeName, "topic") - .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName) + final TopologyBuilder builder = new TopologyBuilder(); + builder.addSource(sourceNodeName, "topic") + .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), + sourceNodeName) .addStateStore( - Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(), - goodNodeName) - .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName); - + Stores.create(LocalMockProcessorSupplier.STORE_NAME) + .withStringKeys().withStringValues().inMemory() + .build(), goodNodeName) + .addProcessor(badNodeName, new LocalMockProcessorSupplier(), + sourceNodeName); + try { final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder); - driver.process("topic", null, null); + fail("Should have thrown StreamsException"); } catch (final StreamsException e) { - final Throwable cause = e.getCause(); - if (cause != null - && cause instanceof TopologyBuilderException - && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) { - throw (TopologyBuilderException) cause; - } else { - throw new RuntimeException("Did expect different exception. Did catch:", e); - } + final String error = e.toString(); + final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName; + + assertThat(error, equalTo(expectedMessage)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index a39e545..901fc4b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -53,7 +52,9 @@ import java.util.regex.Pattern; import static org.apache.kafka.common.utils.Utils.mkList; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -578,17 +579,15 @@ public class InternalTopologyBuilderTest { Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(), goodNodeName); builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName); - + try { new ProcessorTopologyTestDriver(streamsConfig, builder); fail("Should have throw StreamsException"); - } catch (final StreamsException expected) { - final Throwable cause = expected.getCause(); - if (cause == null - || !(cause instanceof TopologyBuilderException) - || !cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) { - throw new RuntimeException("Did expect different exception. Did catch:", expected); - } + } catch (final StreamsException e) { + final String error = e.toString(); + final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName; + + assertThat(error, equalTo(expectedMessage)); } } -- To stop receiving notification emails like this one, please contact guozhang@apache.org.