kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joel Koshy <jjkosh...@gmail.com>
Subject API change in consumer
Date Fri, 30 Mar 2012 19:21:26 GMT
Hi all,

This is a follow-up to the email I had sent out a few days ago on the
consumer API extension as part of KAFKA-249 - after the code review, a more
major API change may be more suitable, so here is an overview.

The new method in the consumer connector that supports wildcarding (in the
v2 patch) returns a list of KafkaMessageAndTopicStream[T] objects.  There
were a couple of comments on this:

- It is (somewhat oddly) different from the existing API
  (createMessageStreams which returns a map containing KafkaMessageStream[T]
  objects)
- We already have a MessageAndOffset class, and at some point we may want to
  give consumers access to logical partition/offset information.

So this would be an opportunity to fix the consumer API to accomodate a more
general consumer stream and iterator API, that provide access to
MessageAndMetadata elements, each of which contains the message + metadata
(such as topic, offset, partition, etc.)

So I have incorporated this in a new patch (which I will upload soon after I
address all the other review comments), and I wanted to share the API
changes here since it is a more significant change that would require users
of the consumer and iterator to update their code.

--------------------------------------------------------------------------------

Proposal for the new ConsumerConnector API:

  /**
   *  Create a list of MessageStreams for each topic.
   *
   *  @param topicCountMap  a map of (topic, #streams) pair
   *  @param decoder Decoder to decode each Message to type T
   *  @return a map of (topic, list of  KafkaMessageAndMetadataStream)
pairs.
   *          The number of items in the list is #streams. Each stream
supports
   *          an iterator over message/metadata pairs.
   */
  def createMessageStreams[T](topicCountMap: Map[String,Int],
                              decoder: Decoder[T] = new DefaultDecoder)
    : Map[String,List[KafkaMessageAndMetadataStream[T]]]


  /**
   *  Create a list of message streams for all topics that match a given
filter.
   *
   *  @param filterSpec Either a Whitelist or Blacklist TopicFilterSpec
object.
   *  @param numStreams Number of streams to return
   *  @param decoder Decoder to decode each Message to type T
   *  @return a list of KafkaMessageAndMetadataStream each of which
provides an
   *          iterator over message/metadata pairs over allowed topics.
   */
  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec,
                                      numStreams: Int = 1,
                                      decoder: Decoder[T] = new
DefaultDecoder)
    : Seq[KafkaMessageAndMetadataStream[T]]

--------------------------------------------------------------------------------

The KafkaMessageAndMetadataStream[T]'s iterator is a ConsumerIterator[T]
which is an iterator over MessageAndMetadata[T] objects:

case class MessageAndMetadata[T](message: T, topic: String = "", offset:
Long = -1L)

Although the MessageAndMetadata class is simple, it also needs to be evolved
carefully - i.e., adding fields is easy, but removing fields would
effectively break older
clients at compile time).  I think it would be better to avoid schemas
and/or explicit
versioning since that would make writing the client-side code more
difficult.

--------------------------------------------------------------------------------

This means the current pattern of:

for (message <- stream) {
  // process(message)
}

will change to:

for (msgAndMetadata <- stream) {
  // processMessage(msgAndMetadata.message)
  // can also access msgAndMetadata.offset, topic, etc. if appropriate
}

--------------------------------------------------------------------------------

Would love to get any thoughts on this. Given that this is an API
change that would require code changes for consumers, I wanted to send this
around for comments/objections before proceeding further.

Thanks,

Joel

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message