From dev-return-92925-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Tue Apr 3 12:08:05 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3AD12180675 for ; Tue, 3 Apr 2018 12:08:05 +0200 (CEST) Received: (qmail 16116 invoked by uid 500); 3 Apr 2018 10:08:03 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 16100 invoked by uid 99); 3 Apr 2018 10:08:03 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Apr 2018 10:08:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 6919D1A02D2 for ; Tue, 3 Apr 2018 10:08:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -102.311 X-Spam-Level: X-Spam-Status: No, score=-102.311 tagged_above=-999 required=6.31 tests=[RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id G5ZGplXyLkBS for ; Tue, 3 Apr 2018 10:08:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 63B525F19C for ; Tue, 3 Apr 2018 10:08:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 84839E0185 for ; Tue, 3 Apr 2018 10:08:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 0A88F25612 for ; Tue, 3 Apr 2018 10:08:00 +0000 (UTC) Date: Tue, 3 Apr 2018 10:08:00 +0000 (UTC) From: "Valentino Proietti (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 Valentino Proietti created KAFKA-6742: ----------------------------------------- Summary: TopologyTestDriver error when dealing with stores fro= m GlobalKTable Key: KAFKA-6742 URL: https://issues.apache.org/jira/browse/KAFKA-6742 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: Valentino Proietti {color:#FF0000}This junit test simply fails:{color} @Test *public* *void* globalTable() { StreamsBuilder builder =3D *new* StreamsBuilder(); @SuppressWarnings("unused") *final* KTable localTable =3D builder .table("local",=C2=A0 Consumed._with_(Serdes._String_(), Serdes._String_()), Materialized._as_("localStore")) ; @SuppressWarnings("unused") *final* GlobalKTable globalTable =3D builder .globalTable("global",=C2=A0 Consumed._with_(Serdes._String_(), Serdes._String_()), =C2=A0 =C2=A0 =C2=A0 =C2=A0 Materialized._as_("globalStore")) ; // Properties props =3D *new* Properties(); props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test"); props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost"); TopologyTestDriver testDriver =3D *new* TopologyTestDriver(builder.build(),= props); // *final* KeyValueStore localStore =3D testDriver.getKeyValueS= tore("localStore"); Assert._assertNotNull_(localStore); Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore")); // *final* KeyValueStore globalStore =3D testDriver.getKeyValue= Store("globalStore"); Assert._assertNotNull_(globalStore); Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore")); // =C2=A0 =C2=A0 *final* ConsumerRecordFactory crf =3D *new* Co= nsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer()); testDriver.pipeInput(crf.create("local", "one", "TheOne")); testDriver.pipeInput(crf.create("global", "one", "TheOne")); // Assert._assertEquals_("TheOne", localStore.get("one")); Assert._assertEquals_("TheOne", globalStore.get("one")); =C2=A0 =C2=A0 {color:#FF0000}to make it work I had to modify the=C2=A0TopologyTestDriver = class as follow:{color} ... =C2=A0 =C2=A0 *public* Map getAllStateStores() { //=C2=A0 =C2=A0 =C2=A0 =C2=A0 final Map allStores =3D n= ew HashMap<>(); //=C2=A0 =C2=A0 =C2=A0 =C2=A0 for (final String storeName : internalTopolog= yBuilder.allStateStoreName()) { //=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 allStores.put(storeName, ((Proc= essorContextImpl) task.context()).getStateMgr().getStore(storeName)); //=C2=A0 =C2=A0 =C2=A0 =C2=A0 } //=C2=A0 =C2=A0 =C2=A0 =C2=A0 return allStores; =C2=A0 =C2=A0 {color:#FF0000}// *FIXME*{color} =C2=A0 =C2=A0 *final* ProcessorStateManager psm =3D ((ProcessorContextImpl)= task.context()).getStateMgr(); =C2=A0 =C2=A0 =C2=A0 =C2=A0 *final* Map allStores =3D *= new* HashMap<>(); =C2=A0 =C2=A0 =C2=A0 =C2=A0 *for* (*final* String storeName : internalTopol= ogyBuilder.allStateStoreName()) { =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 StateStore res =3D psm.getStore(s= toreName); =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 *if* (res =3D=3D *null*) =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 res =3D psm.getGlobalStore(storeN= ame); =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 allStores.put(storeName, res); =C2=A0 =C2=A0 =C2=A0 =C2=A0 } =C2=A0 =C2=A0 =C2=A0 =C2=A0 *return* allStores; =C2=A0 =C2=A0 } ... =C2=A0 =C2=A0 *public* StateStore getStateStore(*final* String name) { //=C2=A0 =C2=A0 =C2=A0 =C2=A0 return ((ProcessorContextImpl) task.context()= ).getStateMgr().getStore(name); =C2=A0 =C2=A0 =C2=A0 =C2=A0 {color:#FF0000}// *FIXME*{color} =C2=A0 =C2=A0 *final* ProcessorStateManager psm =3D ((ProcessorContextImpl)= task.context()).getStateMgr(); =C2=A0 =C2=A0 =C2=A0 =C2=A0 StateStore res =3D psm.getStore(name); =C2=A0 =C2=A0 =C2=A0 =C2=A0 *if* (res =3D=3D *null*) =C2=A0 =C2=A0 =C2=A0 =C2=A0 res =3D psm.getGlobalStore(name); =C2=A0 =C2=A0 =C2=A0 =C2=A0 *return* res; =C2=A0 =C2=A0 } =C2=A0 {color:#FF0000}moreover I think it would be very useful to make the interna= l MockProducer=C2=A0public for testing cases where a producer=C2=A0is used = along side with the "normal" stream processing by adding the method:{color} =C2=A0 =C2=A0 /** =C2=A0=C2=A0 =C2=A0 * *@return* records sent with this producer are automat= ically streamed to the topology. =C2=A0=C2=A0 =C2=A0 */ =C2=A0 =C2=A0 *public* *final* Producer<*byte*[], *byte*[]> getProducer() { =C2=A0 =C2=A0 *return* producer; =C2=A0 =C2=A0 } =C2=A0 {color:#FF0000}unfortunately this introduces another problem that could be = verified by adding the following lines to the previous junit test:{color} ... ** // ConsumerRecord<*byte*[],*byte*[]> cr =3D crf.create("dummy", "two", "Second= "); // just to serialize keys and values testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, cr.ti= mestamp(), cr.key(), cr.value())); testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, cr.t= imestamp(), cr.key(), cr.value())); testDriver.advanceWallClockTime(0); Assert._assertEquals_("TheOne", localStore.get("one")); Assert._assertEquals_("Second", localStore.get("two")); Assert._assertEquals_("TheOne", globalStore.get("one")); Assert._assertEquals_("Second", globalStore.get("two")); } =C2=A0 {color:#FF0000}that could be fixed with:{color} =C2=A0 =C2=A0 =C2=A0 *private* *void* captureOutputRecords() { =C2=A0 =C2=A0 =C2=A0 =C2=A0 // Capture all the records sent to the producer= ... =C2=A0 =C2=A0 =C2=A0 =C2=A0 *final* List= > output =3D producer.history(); =C2=A0 =C2=A0 =C2=A0 =C2=A0 producer.clear(); =C2=A0 =C2=A0 =C2=A0 =C2=A0 *for* (*final* ProducerRecord<*byte*[], *byte*[= ]> record : output) { =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 Queue> outputRecords =3D outputRecordsByTopic.get(record.topic()); =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 *if* (outputRecords =3D=3D *null*= ) { =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 outputRecords =3D *= new* LinkedList<>(); =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 outputRecordsByTopi= c.put(record.topic(), outputRecords); =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 } =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 outputRecords.add(record); =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 // Forward back into the topology= if the produced record is to an internal or a source topic ... =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 *final* String outputTopicName = =3D record.topic(); =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 *if* (internalTopics.contains(out= putTopicName) || processorTopology.sourceTopics().contains(outputTopicName) =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 || globalPartitionsByTopic.contai= nsKey(outputTopicName)) {=C2=A0 {color:#FF0000}// *FIXME*{color} =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 *final* *byte*[] se= rializedKey =3D record.key(); =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 *final* *byte*[] se= rializedValue =3D record.value(); =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 pipeInput(*new* Con= sumerRecord<>( =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 outpu= tTopicName, =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 -1, =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 -1L, =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 recor= d.timestamp(), =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 Times= tampType.*_CREATE_TIME_*, =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 0L, =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 seria= lizedKey =3D=3D *null* ? 0 : serializedKey.length, =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 seria= lizedValue =3D=3D *null* ? 0 : serializedValue.length, =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 seria= lizedKey, =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 seria= lizedValue)); =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 } =C2=A0 =C2=A0 =C2=A0 =C2=A0 } =C2=A0 =C2=A0 } =C2=A0 =C2=A0 =C2=A0 *Thank you* -- This message was sent by Atlassian JIRA (v7.6.3#76005)