kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Joel Koshy (Commented) (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-249) Separate out Kafka mirroring into a stand-alone app
Date Wed, 28 Mar 2012 21:25:31 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13240742#comment-13240742

Joel Koshy commented on KAFKA-249:

Thanks for the reviews. Further comments inline.

Jun's comments:

> 1. For future extension, I am thinking that we should probably unifying
> KafkaMessageStream and KafkaMessageAndTopicStream to sth like
> KafkaMessageMetadataStream. The stream gives a iterator of Message and its
> associated meta data. For now, the meta data can be just topic. In the
> future, it may include things like partition id and offset. 

That's a good suggestion. I'm not sure if it is better to factor that change
for the existing createMessageStreams into 0.8 instead of trunk, because it
is a fundamental API change that would break existing clients (at compile
time).  I can propose this to the mailing list to see if anyone has a
preference. If no one objects, then we can remove it.

> 2. ZookeeperConsumerConnector: 2.1 updateFetcher: no need to pass in
> messagStreams 

Will do

> 2.2 ZKRebalancerListener: It seems that kafkaMessageStream can be
> immutable. 

It is mutable because it is updated in consumeWildcardTopics.

> 2.3 createMessageStreamByFilter: topicsStreamsMap is empty when passed to
> ZKRebalanceListener. This means that the queue is not cleared during
> rebalance. 

Related to previous comment. The topicsStreamsMap is bootstrapped in
consumeWildCardTopics and updated at every topic event if there are new
allowed topics. So it will be populated before any rebalance occurs.

> 2.4 consumeWildCardTopics: I find it hard to read the code in this method.
> Is there a real benefit to use implicit conversion here, instead of
> explicit conversion? It's not clear to me where the conversion is used.
> The 2-level tuple makes it hard to figure out what the referred fields
> represent. Is the code relying on groupedTopicThreadIds being sorted by
> (topic, threadid)? If so, where is that enforced. 

The map flatten method is a bit confusing. I'm using (and hopefully not
misusing) this variant:

def flatten [B] (implicit asTraversable: ((A, B)) ⇒ TraversableOnce[B]): Traversable[B]

Converts this map of traversable collections into a map in which all element
collections are concatenated.

It basically allows you to take the KV pairs of a map and generate some
traversable collection out of it. Here is how I'm using it: We have a list
of queues (e.g., List(queue1, queue2)) and a map of
consumerThreadIdsPerTopic (e.g.,

  { "topic1" -> Set("topic1-1", "topic1-2"),
    "topic2" -> Set("topic2-1", "topic2-2"),
    "topic3" -> Set("topic3-1", topic3-2") } ).

>From the above I need to create pairs of topic/thread -> queue, like this:

  { ("topic1", "topic1-1") -> queue1,
    ("topic1", "topic1-2") -> queue2,
    ("topic2", "topic2-1") -> queue1,
    ("topic2", "topic2-2") -> queue2,
    ("topic3", "topic3-1") -> queue1,
    ("topic3", "topic3-2") -> queue2 }

This is a bit tricky and I had trouble finding a clearer way to write it.
However, I agree that this snippet is hard to read - even I'm having
difficulty reading it now, but I think keeping it concise as is and adding
comments such as the above example to explain what is going on should help.

> 3. KafkaServerStartable: Should we remove the embedded consumer now? 

My original thought was that it would be good to keep it around for
fall-back, but I guess it can be removed.

> 4. Utils, UtilsTest: unused import 

Will do.


Neha's comments:

> 1. It seems awkward that there is a MessageStream trait and the only API
> it exposes is clear(). Any reason it doesn't expose the iterator() API ?
> From a user's perspective, one might think, since it is a stream, it would
> expose stream specific APIs too. It will be good to add docs to that API
> to explain exactly what it is meant for.

The only reason it was added was because I have two message stream types
now. Anyway, this will go away if we switch to the common

> 3. There is some critical code that is duplicated in the
> ZookeeperConsumerConnector. consume() and consumeWildcardTopics() have
> some code in common. It would be great if this can be refactored to share
> the logic of registering session expiration listeners, registering watches
> on consumer group changes and topic partition changes.

Will do

> 4. Could you merge all the logic that wraps the wildcard handling in one
> API ? Right now, it is distributed between createMessageStreamsByFilter
> and consumeWildcardTopics. It will be great if there is one API that will
> pre process the wild cards, create the relevant queues and then call a
> common consume() that has the logic described in item 5 above.

Slightly involved, but it is worth doing.

> 5. There are several new class variables called wildcard* in
> ZookeeperConsumerConnector. I'm thinking they can just be variables local
> to createMessageStreamsByFilter ?

Related to above. consumeWildcardTopics actually needs to access these so
that's why it's global - in this case global makes sense in that you really
wouldn't need to (and currently cannot) make multiple calls to
createMessageStreamsByFilter.  However, it would be good to localize them if
possible to make the code easier to read.

> 6. There is a MessageAndTopic class, that seems to act as a container to
> hold message and other metadata, but only exposes one API to get the
> message. Topic is exposed by making it a public val. Would it make sense
> to either make it a case class or provide consistent APIs for all fields
> it holds ?

Ok, but this will likely go away due to the MessageMetadata discussion.

> 7. Since now we seem to have more than one iterators for the consumer,
> would it make sense to rename ConsumerIterator to MessageIterator, and
> TopicalConsumerIterator to MessageAndMetadataIterator ?

Makes sense, but it could break existing users of KafkaMessageStream.  Also,
if we can get rid of KafkaMessageStream and just go with
KafkaMessageAndMetadataStream we will have only one iterator type.

> 8. rat fails on this patch. There are some files without the Apache header

Good catch and reminder that reviews should ideally include running rat. I
do need to add the header for some files.

> Separate out Kafka mirroring into a stand-alone app
> ---------------------------------------------------
>                 Key: KAFKA-249
>                 URL: https://issues.apache.org/jira/browse/KAFKA-249
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>            Reporter: Joel Koshy
>            Assignee: Joel Koshy
>             Fix For: 0.7.1
>         Attachments: KAFKA-249.v1.patch, KAFKA-249.v2.patch
> I would like to discuss on this jira, the feasibility/benefits of separating
> out Kafka's mirroring feature from the broker into a stand-alone app, as it
> currently has a couple of limitations and issues.
> For example, we recently had to deal with Kafka mirrors that were in fact
> idle due to the fact that mirror threads were not created at start-up due to
> a rebalancing exception, but the Kafka broker itself did not shutdown. This
> has since been fixed, but is indicative of (avoidable) problems in embedding
> non-broker specific features in the broker.
> Logically, it seems to make sense to separate it out to achieve better
> division of labor.  Furthermore, enhancements to mirroring may be less
> clunky to implement and use with a stand-alone app.  For example to support
> custom partitioning on the target cluster, or to mirror from multiple
> clusters we would probably need to be able to pass in multiple embedded
> consumer/embedded producer configs, which would be less ugly if the
> mirroring process were a stand-alone app.  Also, if we break it out, it
> would be convenient to use as a "consumption engine" for the console
> consumer which will make it easier to add on features such as wildcards in
> topic consumption, since it contains a ZooKeeper topic discovery component.
> Any suggestions and/or objections to this?

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira


View raw message