From commits-return-8734-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Thu Jan 18 17:57:48 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 1B3D8180654 for ; Thu, 18 Jan 2018 17:57:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0AF85160C36; Thu, 18 Jan 2018 16:57:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 02BE4160C2B for ; Thu, 18 Jan 2018 17:57:46 +0100 (CET) Received: (qmail 30688 invoked by uid 500); 18 Jan 2018 16:57:46 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 30679 invoked by uid 99); 18 Jan 2018 16:57:46 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jan 2018 16:57:46 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 7F6A082141; Thu, 18 Jan 2018 16:57:45 +0000 (UTC) Date: Thu, 18 Jan 2018 16:57:45 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: improve internal topic integration test (#4437) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <151629466509.16000.8438102404405710093@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 5701ba88f895ac9211f25c15204d6281aed00ffc X-Git-Newrev: e98162792684b0874c60003c6a596ec739c934a3 X-Git-Rev: e98162792684b0874c60003c6a596ec739c934a3 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e981627 improve internal topic integration test (#4437) e981627 is described below commit e98162792684b0874c60003c6a596ec739c934a3 Author: Guozhang Wang AuthorDate: Thu Jan 18 08:57:42 2018 -0800 improve internal topic integration test (#4437) Reviewers: Damian Guy --- .../integration/InternalTopicIntegrationTest.java | 179 +++++++++++---------- 1 file changed, 90 insertions(+), 89 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 65a6de7..1469d18 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -22,7 +22,6 @@ import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Time; @@ -32,24 +31,27 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestUtils; +import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import scala.Tuple2; -import scala.collection.Iterator; -import scala.collection.Map; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -61,93 +63,90 @@ import static org.junit.Assert.assertTrue; */ @Category({IntegrationTest.class}) public class InternalTopicIntegrationTest { - private static final int NUM_BROKERS = 1; @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - private final MockTime mockTime = CLUSTER.time; + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + private static final String APP_ID = "internal-topics-integration-test"; private static final String DEFAULT_INPUT_TOPIC = "inputTopic"; - private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000; private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000; - private Properties streamsConfiguration; - private String applicationId = "compact-topics-integration-test"; + + private final MockTime mockTime = CLUSTER.time; + + private Properties streamsProp; @BeforeClass public static void startKafkaCluster() throws InterruptedException { - CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_OUTPUT_TOPIC); + CLUSTER.createTopics(DEFAULT_INPUT_TOPIC); } @Before public void before() { - streamsConfiguration = new Properties(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsProp = new Properties(); + streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); + streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + } + + @After + public void after() throws IOException { + // Remove any state from previous test runs + IntegrationTestUtils.purgeLocalStreamsState(streamsProp); } - private Properties getTopicConfigProperties(final String changelog) { - final KafkaZkClient kafkaZkClient = KafkaZkClient.apply(CLUSTER.zKConnectString(), false, - DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, - "testMetricGroup", "testMetricType"); - try { + private void produceData(final List inputValues) throws Exception { + final Properties producerProp = new Properties(); + producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + producerProp.put(ProducerConfig.ACKS_CONFIG, "all"); + producerProp.put(ProducerConfig.RETRIES_CONFIG, 0); + producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerProp, mockTime); + } + + private Properties getTopicProperties(final String changelog) { + try (KafkaZkClient kafkaZkClient = KafkaZkClient.apply(CLUSTER.zKConnectString(), false, + DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, + Time.SYSTEM, "testMetricGroup", "testMetricType")) { final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); + final Map topicConfigs = scala.collection.JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs()); - final Map topicConfigs = adminZkClient.getAllTopicConfigs(); - final Iterator it = topicConfigs.iterator(); - while (it.hasNext()) { - final Tuple2 topicConfig = (Tuple2) it.next(); - final String topic = topicConfig._1; - final Properties prop = topicConfig._2; - if (topic.equals(changelog)) { - return prop; - } + for (Map.Entry topicConfig : topicConfigs.entrySet()) { + if (topicConfig.getKey().equals(changelog)) + return topicConfig.getValue(); } + return new Properties(); - } finally { - kafkaZkClient.close(); } } @Test - public void shouldCompactTopicsForStateChangelogs() throws Exception { + public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception { + final String appID = APP_ID + "-compact"; + streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + // // Step 1: Configure and start a simple word count topology // - final Serde stringSerde = Serdes.String(); - final Serde longSerde = Serdes.Long(); - - final Properties streamsConfiguration = new Properties(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test"); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); final StreamsBuilder builder = new StreamsBuilder(); - final KStream textLines = builder.stream(DEFAULT_INPUT_TOPIC); - final KStream wordCounts = textLines - .flatMapValues(new ValueMapper>() { - @Override - public Iterable apply(final String value) { - return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); - } - }).groupBy(MockMapper.selectValueMapper()) - .count("Counts").toStream(); - - wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC); - - // Remove any state from previous test runs - IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + textLines.flatMapValues(new ValueMapper>() { + @Override + public Iterable apply(final String value) { + return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); + } + }) + .groupBy(MockMapper.selectValueMapper()) + .count(Materialized.>as("Counts")); - final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); + final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp); streams.start(); // @@ -159,41 +158,39 @@ public class InternalTopicIntegrationTest { // Step 3: Verify the state changelog topics are compact // streams.close(); - final Properties properties = getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId, "Counts")); - assertEquals(LogConfig.Compact(), properties.getProperty(LogConfig.CleanupPolicyProp())); - } - private void produceData(final List inputValues) throws Exception { - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig, mockTime); + final Properties changelogProps = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "Counts")); + assertEquals(LogConfig.Compact(), changelogProps.getProperty(LogConfig.CleanupPolicyProp())); + + final Properties repartitionProps = getTopicProperties(appID + "-Counts-repartition"); + assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp())); + assertEquals(4, repartitionProps.size()); } @Test - public void shouldUseCompactAndDeleteForWindowStoreChangelogs() throws Exception { - StreamsBuilder builder = new StreamsBuilder(); + public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Exception { + final String appID = APP_ID + "-compact-delete"; + streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + // + // Step 1: Configure and start a simple word count topology + // + StreamsBuilder builder = new StreamsBuilder(); KStream textLines = builder.stream(DEFAULT_INPUT_TOPIC); final int durationMs = 2000; - textLines - .flatMapValues(new ValueMapper>() { - @Override - public Iterable apply(String value) { - return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); - } - }).groupBy(MockMapper.selectValueMapper()) - .count(TimeWindows.of(1000).until(durationMs), "CountWindows").toStream(); - - // Remove any state from previous test runs - IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + textLines.flatMapValues(new ValueMapper>() { + @Override + public Iterable apply(final String value) { + return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); + } + }) + .groupBy(MockMapper.selectValueMapper()) + .windowedBy(TimeWindows.of(1000).until(2000)) + .count(Materialized.>as("CountWindows")); - KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); + KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp); streams.start(); // @@ -205,7 +202,7 @@ public class InternalTopicIntegrationTest { // Step 3: Verify the state changelog topics are compact // streams.close(); - final Properties properties = getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId, "CountWindows")); + final Properties properties = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "CountWindows")); final List policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(",")); assertEquals(2, policies.size()); assertTrue(policies.contains(LogConfig.Compact())); @@ -213,5 +210,9 @@ public class InternalTopicIntegrationTest { // retention should be 1 day + the window duration final long retention = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS) + durationMs; assertEquals(retention, Long.parseLong(properties.getProperty(LogConfig.RetentionMsProp()))); + + final Properties repartitionProps = getTopicProperties(appID + "-CountWindows-repartition"); + assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp())); + assertEquals(4, repartitionProps.size()); } } -- To stop receiving notification emails like this one, please contact ['"commits@kafka.apache.org" '].