kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michal Borowiecki (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-3455) Connect custom processors with the streams DSL
Date Mon, 15 May 2017 13:43:04 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010530#comment-16010530
] 

Michal Borowiecki commented on KAFKA-3455:
------------------------------------------

Hi [~bobbycalderwood],

Can you please describe your use-case, where it would be useful to re-use Processor/Transformer
implementations?

As to Transformer.punctuate return value having to be null, the javadoc was in error but has
been fixed on trunk (to be released).

Changing the method signature of Transformer.punctuate would be a backward-incompatible change,
however, [KIP-138|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]
will deprecated both methods in favour of a functional interface passed to ProcessorContext.schedule(),
so it's a small step in the direction you're suggesting.

I think AutoCloseable is a false friend in this case. The intention behind AutoCloseable is
for objects created in a try-with-resources statement to be closed when execution exists that
statement. However, the Processor is being created when you are *defining* the topology and
must not be closed from that same block of code, since it's used as long as the topology is
actually *running*, which is happening in different threads.

As to init() and close() I think it would make sense to have them pulled out, however, again
due to backwards-compatibility it's not as simple as it sounds.
Fortunately, once Java 7 compatibility is dropped, it will be possible to change their definition
to a default method with an empty body. I think that would be backwards-compatible. That would
leave only one abstract method for Processor and Transformer, process() and transform(), respectively.
Since these are actually *different* from each other, I'd say that then there'd be no repetition.

Would that help your use-cases?

> Connect custom processors with the streams DSL
> ----------------------------------------------
>
>                 Key: KAFKA-3455
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3455
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>    Affects Versions: 0.10.0.1
>            Reporter: Jonathan Bender
>              Labels: user-experience
>             Fix For: 0.11.0.0
>
>
> From the kafka users email thread, we discussed the idea of connecting custom processors
with topologies defined from the Streams DSL (and being able to sink data from the processor).
 Possibly this could involve exposing the underlying processor's name in the streams DSL so
it can be connected with the standard processor API.
> {quote}
> Thanks for the feedback. This is definitely something we wanted to support
> in the Streams DSL.
> One tricky thing, though, is that some operations do not translate to a
> single processor, but a sub-graph of processors (think of a stream-stream
> join, which is translated to actually 5 processors for windowing / state
> queries / merging, each with a different internal name). So how to define
> the API to return the processor name needs some more thinking.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message