kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values
Date Wed, 01 Feb 2017 00:44:52 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847802#comment-15847802
] 

Matthias J. Sax commented on KAFKA-3543:
----------------------------------------

[~gfodor] I close this as duplicate. Nevertheless one question: I don't understand your comment
in the JIRA description about "just calling forward() myself on the context and actually emitting
dummy values which are filtered out downstream" ? Actually, using {{context.forward()}} is
absolutely fine and efficient.

> Allow a variant of transform() which can emit multiple values
> -------------------------------------------------------------
>
>                 Key: KAFKA-3543
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3543
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Greg Fodor
>              Labels: api
>             Fix For: 0.10.3.0
>
>
> Right now it seems that if you want to apply an arbitrary stateful transformation to
a stream, you either have to use a TransformerSupplier or ProcessorSupplier sent to transform()
or process(). The custom processor will allow you to emit multiple new values, but the process()
method currently terminates that branch of the topology so you can't apply additional data
flow. transform() lets you continue the data flow, but forces you to emit a single value for
every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the ProcessorContext
and emit multiple, but that's probably not the ideal way to do it :))
> It seems desirable to somehow allow a transformation that emits multiple values per input
value. I'm not sure of the best way to factor this inside of the current TransformerSupplier/Transformer
architecture in a way that is clean and efficient -- currently I'm doing the workaround above
of just calling forward() myself on the context and actually emitting dummy values which are
filtered out downstream.
> -------------
> It is worth considering adding a new flatTransofrm function as 
> {code}
> <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, Iterable<KeyValue<K1,
V1>>> transformerSupplier, String... stateStoreNames)
> {code}
> which is essentially the same as
> {code} transform().flatMap() {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message