Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2FE12160C05 for ; Wed, 3 Jan 2018 21:11:06 +0100 (CET) Received: (qmail 55673 invoked by uid 500); 3 Jan 2018 20:11:05 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 55662 invoked by uid 99); 3 Jan 2018 20:11:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jan 2018 20:11:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id DEB4BC024E for ; Wed, 3 Jan 2018 20:11:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.011 X-Spam-Level: X-Spam-Status: No, score=-100.011 tagged_above=-999 required=6.31 tests=[RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id PELcGTRGCKHk for ; Wed, 3 Jan 2018 20:11:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 3E7165FACE for ; Wed, 3 Jan 2018 20:11:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 5AF1FE1288 for ; Wed, 3 Jan 2018 20:11:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 73DF52411A for ; Wed, 3 Jan 2018 20:11:00 +0000 (UTC) Date: Wed, 3 Jan 2018 20:11:00 +0000 (UTC) From: "Guozhang Wang (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Assigned] (KAFKA-6398) Stream-Table join fails, if table is not materialized MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 03 Jan 2018 20:11:06 -0000 [ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-6398: ------------------------------------ Assignee: Guozhang Wang > Stream-Table join fails, if table is not materialized > ----------------------------------------------------- > > Key: KAFKA-6398 > URL: https://issues.apache.org/jira/browse/KAFKA-6398 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.1, 1.0.0 > Reporter: Matthias J. Sax > Assignee: Guozhang Wang > > Using a non-materialized KTable in a stream-table join fails: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(...); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > fails with > {noformat} > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore null is not added yet. > at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) > at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) > at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) > at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) > at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) > {noformat} > Adding a store name is not sufficient as workaround but fails differently: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(..., "STORE-NAME"); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > error: > {noformat} > org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-JOIN-0000000005 > at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) > at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) > at org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) > Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Processor KSTREAM-JOIN-0000000005 has no access to StateStore KTABLE-SOURCE-STATE-STORE-0000000000 > at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) > at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45) > at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121) > at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44) > at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53) > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111) > {noformat} > One can workaround by piping the result through a topic: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(...).through("TOPIC");; > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)