Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 67B0F200BA6 for ; Tue, 18 Oct 2016 16:36:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 662C1160ADC; Tue, 18 Oct 2016 14:36:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 855B0160ACC for ; Tue, 18 Oct 2016 16:36:41 +0200 (CEST) Received: (qmail 46116 invoked by uid 500); 18 Oct 2016 14:36:40 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 46096 invoked by uid 99); 18 Oct 2016 14:36:40 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Oct 2016 14:36:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id AA91BC1885 for ; Tue, 18 Oct 2016 14:36:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.68 X-Spam-Level: * X-Spam-Status: No, score=1.68 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, HTML_OBFUSCATE_05_10=0.001, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id aP2cM6PkttWv for ; Tue, 18 Oct 2016 14:36:36 +0000 (UTC) Received: from mail-qt0-f181.google.com (mail-qt0-f181.google.com [209.85.216.181]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 5CCEB5F24F for ; Tue, 18 Oct 2016 14:36:36 +0000 (UTC) Received: by mail-qt0-f181.google.com with SMTP id s49so155670960qta.0 for ; Tue, 18 Oct 2016 07:36:36 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=VnTIXfdh4q/LfraMFu8f1kbwhp27P5wn81S08tPJLrM=; b=Mkc4Xyn1hYpprUkfzU0XqmLq8l2Jyf3JfcXBJVQalekQ87JSTIHUbX+z7dNlPnpfh1 OVHTD3rn9yQ7B9ae7ATmbNFAVEyGNWQw4StAVt904/CAXYYR0AIzDt1WdkboAB0qZZai QikoX+UwfqXuCObWWJiXpvvDHkCv48ahtLofvypASkv1z1l19tS4w0bZGgKIs7t3R1/h aEqdn6ZuDkc8I+gPTA/3hxPTsqUZ4JhRdN1WT8TdlLjhVSC0wOjU/gpMCazPNKaldXks NGK8yrW8rNC9DuOde/D4GhqcN7FlvZKFnr+hep8YwthAAAZJxMughMNPYWViqyT4WrjO PgLQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=VnTIXfdh4q/LfraMFu8f1kbwhp27P5wn81S08tPJLrM=; b=KBSWcVrKtWM968jwK3SdxnyQW/ekaFgW/RjcwjUbribXktUtayiOIksSsSsbxaqVGd tC3kcCkO1CUA/cMHdw5zhF1Gj5lTBdC9IbyBTcwTbhQqMpaU+D89TpTsvKNhCf0/gBio lWs5YVIA6NtYet19AoD1uo3VYdUGwauUyahE2k6moErfRzJu9B0namERYgJEQFPYJApA cfTL1wDwSziI2y2RpqsmOUu72hKxwCCOabxM2yEPp01KlWJkBNG8GIjhg9FzTcCRVvQs gdkUA9SNMtH36npk7UKLY/qXygL8xhMeawQLdbbZ3X4yiK/eENp+gJkwMd+uMlS7l1mV 5Ikg== X-Gm-Message-State: AA6/9Rl/75x2TBjD0JS1jVjYRZdm/kZiDDDJF25bpqBHBTb3PStDSJOBmwcYwwUoBZCy7nn7Zw6sEULD4VGReQ== X-Received: by 10.194.97.69 with SMTP id dy5mr429837wjb.53.1476801395556; Tue, 18 Oct 2016 07:36:35 -0700 (PDT) MIME-Version: 1.0 Received: by 10.28.17.66 with HTTP; Tue, 18 Oct 2016 07:36:34 -0700 (PDT) In-Reply-To: References: <08a1d28d-a157-11a5-f9ca-91635066ffc7@confluent.io> From: Florian Hussonnois Date: Tue, 18 Oct 2016 16:36:34 +0200 Message-ID: Subject: Re: KStreams / add support for sink processor with dynamic topics To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=089e0103e2f696092b053f249e65 archived-at: Tue, 18 Oct 2016 14:36:42 -0000 --089e0103e2f696092b053f249e65 Content-Type: text/plain; charset=UTF-8 Thank you Matthias for your answers. The mailing list that you linked shows a solution using the Processor API. Actually, the set of subtypes is not known in advance this is why I need to compute output topics from messages. So the branch method is of any help in my context. I think, this feature should be supported by the DSL as the Processor API solution is not completely safe. 2016-10-18 10:01 GMT+02:00 Damian Guy : > Hi Florian, > > Do you know the set of subtypes in advance? I.e, could you use: > > KStream[] branches = stream.branch(predicates); > > to split the stream based on the subtypes? > > Thanks, > Damian > > > On Tue, 18 Oct 2016 at 00:43 Matthias J. Sax > wrote: > > > -----BEGIN PGP SIGNED MESSAGE----- > > Hash: SHA512 > > > > Hi, > > > > using DSL you cannot do this. However, if you use Processor API you can. > > > > There are similar question on the mailing list already. For example: > > http://search-hadoop.com/m/uyzND1lghNN1tzbf41&subj=kafka+stream+to+new+t > > opic+based+on+message+key > > > > As we got this request multiple times already, it might be worth > > adding it IMHO. Not sure what the opinion of other is? We should make > > sure that the feature gets accepted before you put a lot of effort in > > it. :) > > > > > > - -Matthias > > > > On 10/17/16 2:10 PM, Florian Hussonnois wrote: > > > Hi All, > > > > > > Currently, it seems not possible with KStream to produce messages > > > to topics which are not known until runtime. > > > > > > For a new project I am evaluating the Kafka Connect / Kafka > > > Streams architecture but without that feature I cannot retain the > > > KStreams API. > > > > > > Our use case is pretty basic. We have xml messages in input of our > > > topology. Each message is splitted into subtypes and formatted in > > > Avro before being sent to a dedicated topic. > > > > > > So the output topics depend of the subtype of each message. > > > > > > I think it would be nice to add methods into the KStream interface > > > to provide such feature. > > > > > > If you think that feature would be usefull I can create a jira and > > > contribute to it. Also, do I need to create a new KIP as this > > > requires changes on a public API ? > > > > > > Thanks, > > > > > -----BEGIN PGP SIGNATURE----- > > Comment: GPGTools - https://gpgtools.org > > > > iQIcBAEBCgAGBQJYBWIhAAoJECnhiMLycopPfTQQAI69Uii5xd8KvaEo/Aeqs0Xw > > AzdPHekdVoHANzo1h45W1x3/lnyeMU/n2v09Agsz46cxb+Xbz9NOKGqT3v9Ye0Ic > > Eyl5yib1B6sWr41rGuYmwDH8zBoC8dPfGZiWhfXL4Sypey3RWzQlVAUWg8Ob4tqF > > rFeubMjWp7yopKRe/7n//JHF029hVK/ePk1vdEsI+2lBI4N7q9ONT/1wKkeCAtdd > > CCkI2WP/WbHzCcUVmOL41KoqgQFnmrH7BtLH67jumzEIR16H+ZenGZmS1uzde56E > > 9mEsl4wmAvfB5GJu6y7JnS7FnQotw7pV7ZneQrA2q8eCZHZqs2fkXf+6ZJNYRir+ > > rysqt8wJG69ZN9bSNO1Q6/fNbRiSjYi0I7JnzkErP6scfDKlf3bWzlw6Ejc0+iUr > > Cd0x2m/RlCepVleMT0UZNDlJd0Ml9Q77npP1lyntHVYHjVvtZLdlB5BQYdTMAx3N > > KCLZ+WkY2CBKcwh/KuMr9kW2eWSxH89JSwEule+1bN4vSKyBA6vtrwDoshf6N23g > > dEhTiY5NsgkvAe1JEK6d7PLN2Tq1Tq4OJNoP8PZlqW+YSFl41klQUblo8yT1jSlF > > iCyQS4rgNRabjBs1iZnZNoZ5eodoJMcUyWPhHUYHne+MXuSr1cNNGeNcbS5W0UyE > > dPCe2IiY4zErzxW/Mjmw > > =4DpY > > -----END PGP SIGNATURE----- > > > -- Florian HUSSONNOIS --089e0103e2f696092b053f249e65--