kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Random Partitioning Issue
Date Tue, 01 Oct 2013 15:26:47 GMT
This proposal still doesn't address the following fundamental issue: The
random partitioner cannot select a random and AVAILABLE partition.

So, we have the following two choices.

1. Stick with the current partitioner api.
Then, we have to pick one way to do random partitioning (when key is null).
The current behavior may not be very intuitive, but is one of the possible
behaviors in 0.7.

2. Change the partitioner api so that we can (1) be aware of available
partitions and (2) have pluggable partitioners for doing random
distribution.

Option (2) is probably the right approach. However, it's a non-trivial
change. So, I am not sure if it should be done in 0.8 or not.

Thanks,

Jun



On Mon, Sep 30, 2013 at 10:21 PM, Joe Stein <cryptcom@gmail.com> wrote:

> How about making UUID.randomUUID.toString() the default in KeyedMessage
> instead of null if not supplied
>
> def this(topic: String, message: V) = this(topic,
> UUID.randomUUID.toString(),
> message)
>
> and if you want the random refresh behavior then pass in "*" on the
> KeyedMessage construction which we can then later check for in
> defaulteventhandler
>
>  val partition =
>       if(key =="*") {
>
> we then throw NPE if key == null in KeyedMessage like we do topic
>
> I believe any null flow control logic is something to shy away from
>
> if this is wrong or too much or still not the best solution we could also
> hold over and just put this in the FAQ with the JIRA and let people know
> when they run into this and want to randomize in development / testing and
> in many production situations where the producer count is not large enough
> then they have to pass in their own continuous random key... if we can get
> a consensus for what we want to-do with minimal changes then I think it is
> important for 0.8 otherwise wait.
>
> On Sun, Sep 29, 2013 at 12:14 PM, Jun Rao <junrao@gmail.com> wrote:
>
> > The main issue is that if we do that, when key is null, we can only
> select
> > a random partition, but not a random and available partition, without
> > changing the partitioner api. Being able to do the latter is important in
> > my opinion. For example, a user may choose the replication factor of a
> > topic to be 1. If a broker is down, it's much better to select partitions
> > on other brokers for producing than losing messages.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Sat, Sep 28, 2013 at 9:51 PM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > I think Joe's suggesting that we can remove the checking logic for
> > > key==null in DefaultEventHandler, and do that in partitioner.
> > >
> > > One thing about this idea is any customized partitioner also has to
> > > consider key == null case then.
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Sep 27, 2013 at 9:12 PM, Jun Rao <junrao@gmail.com> wrote:
> > >
> > > > We have the following code in DefaultEventHandler:
> > > >
> > > >     val partition =
> > > >       if(key == null) {
> > > >         // If the key is null, we don't really need a partitioner
> > > >         // So we look up in the send partition cache for the topic to
> > > > decide the target partition
> > > >         val id = sendPartitionPerTopicCache.get(topic)
> > > >         id match {
> > > >           case Some(partitionId) =>
> > > >             // directly return the partitionId without checking
> > > > availability of the leader,
> > > >             // since we want to postpone the failure until the send
> > > > operation anyways
> > > >             partitionId
> > > >           case None =>
> > > >             val availablePartitions =
> > > > topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
> > > >             if (availablePartitions.isEmpty)
> > > >               throw new LeaderNotAvailableException("No leader for
> any
> > > > partition in topic " + topic)
> > > >             val index =
> Utils.abs(partitionCounter.getAndIncrement()) %
> > > > availablePartitions.size
> > > >             val partitionId = availablePartitions(index).partitionId
> > > >             sendPartitionPerTopicCache.put(topic, partitionId)
> > > >             partitionId
> > > >         }
> > > >       } else
> > > >         partitioner.partition(key, numPartitions)
> > > >
> > > > So, if key is null, the partitioner is ignored.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Fri, Sep 27, 2013 at 10:30 AM, Joe Stein <cryptcom@gmail.com>
> > wrote:
> > > >
> > > > > hmmm, yeah, on I don't want todo that ... if we don't have to.
> > > > >
> > > > > What if the DefaultPartitioner code looked like this instead =8^)
> > > > >
> > > > > private class DefaultPartitioner[T](props: VerifiableProperties =
> > null)
> > > > > extends Partitioner[T] {
> > > > >
> > > > >   def partition(key: T, numPartitions: Int): Int = {
> > > > >     if (key == null) {
> > > > >         import java.util.UUID
> > > > >         Utils.abs(UUID.randomUUID.toString()) % numPartitions
> > > > >     }
> > > > >     else {
> > > > >        Utils.abs(key.hashCode) % numPartitions
> > > > >     }
> > > > >   }
> > > > > }
> > > > >
> > > > >
> > > > > Again the goal here is the simple (often initial and dev side up
> and
> > > > > running out of the box) so folks don't have to randomize the keys
> > > > > themselves to get this effect
> > > > >
> > > > > We would still have to also have RandomMetaRefreshPartitioner class
> > > > right?
> > > > > so null keys there would wait for the time refresh for that use
> case,
> > > > > right?
> > > > >
> > > > > private class RandomMetaRefreshPartitioner[T](props:
> > > > VerifiableProperties =
> > > > > null) extends Partitioner[T] {
> > > > >
> > > > >   def partition(key: T, numPartitions: Int): Int = {
> > > > >     Utils.abs(key.hashCode) % numPartitions
> > > > >   }
> > > > > }
> > > > >
> > > > >
> > > > > On Fri, Sep 27, 2013 at 1:10 PM, Jun Rao <junrao@gmail.com> wrote:
> > > > >
> > > > > > However, currently, if key is null, the partitioner is not even
> > > called.
> > > > > Do
> > > > > > you want to change DefaultEventHandler too?
> > > > > >
> > > > > > This also doesn't allow the partitioner to select a random and
> > > > available
> > > > > > partition, which in my opinion is more important than making
> > > partitions
> > > > > > perfectly evenly balanced.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Fri, Sep 27, 2013 at 9:53 AM, Joe Stein <cryptcom@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > What I was proposing was two fold
> > > > > > >
> > > > > > > 1) revert the DefaultPartitioner class
> > > > > > >
> > > > > > > then
> > > > > > >
> > > > > > > 2) create a new partitioner that folks could use (like at
> > LinkedIn
> > > > you
> > > > > > > would use this partitioner instead) in ProducerConfig
> > > > > > >
> > > > > > > private class RandomRefreshTimPartitioner[T](props:
> > > > > VerifiableProperties
> > > > > > =
> > > > > > > null) extends Partitioner[T] {
> > > > > > >   private val random = new java.util.Random
> > > > > > >
> > > > > > >   def partition(key: T, numPartitions: Int): Int = {
> > > > > > >     Utils.abs(key.hashCode) % numPartitions
> > > > > > >   }
> > > > > > > }
> > > > > > >
> > > > > > > /*******************************************
> > > > > > >  Joe Stein
> > > > > > >  Founder, Principal Consultant
> > > > > > >  Big Data Open Source Security LLC
> > > > > > >  http://www.stealth.ly
> > > > > > >  Twitter: @allthingshadoop <
> > http://www.twitter.com/allthingshadoop
> > > >
> > > > > > > ********************************************/
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Sep 27, 2013 at 12:46 PM, Jun Rao <junrao@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > > Joe,
> > > > > > > >
> > > > > > > > Not sure I fully understand your propose. Do you want to put
> > the
> > > > > random
> > > > > > > > partitioning selection logic (for messages without a key) in
> > the
> > > > > > > > partitioner without changing the partitioner api? That's
> > > difficult.
> > > > > The
> > > > > > > > issue is that in the current partitioner api, we don't know
> > which
> > > > > > > > partitions are available. For example, if we have replication
> > > > factor
> > > > > 1
> > > > > > > on a
> > > > > > > > topic and a broker is down, the best thing to do for the
> random
> > > > > > > partitioner
> > > > > > > > is to select an available partition at random (assuming more
> > > than 1
> > > > > > > > partition is created for the topic).
> > > > > > > >
> > > > > > > > Another option is to revert the logic in the random
> > partitioning
> > > > > > > selection
> > > > > > > > logic in DefaultEventHandler to select a random partition per
> > > batch
> > > > > of
> > > > > > > > events (instead of sticking with a random partition for some
> > > > > configured
> > > > > > > > amount of time). This is doable, but I am not sure if it's
> that
> > > > > > critical.
> > > > > > > > Since this is one of the two possible behaviors in 0.7, it's
> > hard
> > > > to
> > > > > > say
> > > > > > > > whether people will be surprised by that. Preserving both
> > > behaviors
> > > > > in
> > > > > > > 0.7
> > > > > > > > will require changing the partitioner api. This is more work
> > and
> > > I
> > > > > > agree
> > > > > > > > it's better to do this post 0.8.0 final.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Sep 27, 2013 at 9:24 AM, Joe Stein <
> cryptcom@gmail.com
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Jun, can we hold this extra change over for 0.8.1 and just
> go
> > > > with
> > > > > > > > > reverting where we were before for the default with a new
> > > > partition
> > > > > > for
> > > > > > > > > meta refresh and support both?
> > > > > > > > >
> > > > > > > > > I am not sure I entirely understand why someone would need
> > the
> > > > > extra
> > > > > > > > > functionality you are talking about which sounds cool
> > though...
> > > > > > adding
> > > > > > > it
> > > > > > > > > to the API (especially now) without people using it may
> just
> > > make
> > > > > > folks
> > > > > > > > ask
> > > > > > > > > more questions and maybe not use it ... IDK ... but in any
> > case
> > > > we
> > > > > > can
> > > > > > > > work
> > > > > > > > > on buttoning up 0.8 and shipping just the change for two
> > > > > partitioners
> > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1067 and
> > circling
> > > > back
> > > > > > if
> > > > > > > we
> > > > > > > > > wanted on this extra item (including the discussion) to
> 0.8.1
> > > or
> > > > > > > greater?
> > > > > > > > >  I am always of the mind of reduce complexity unless that
> > > > > complexity
> > > > > > is
> > > > > > > > in
> > > > > > > > > fact better than not having it.
> > > > > > > > >
> > > > > > > > > On Sun, Sep 22, 2013 at 8:56 PM, Jun Rao <junrao@gmail.com
> >
> > > > wrote:
> > > > > > > > >
> > > > > > > > > > It's reasonable to make the behavior of random producers
> > > > > > customizable
> > > > > > > > > > through a pluggable partitioner. So, if one doesn't care
> > > about
> > > > #
> > > > > of
> > > > > > > > > socket
> > > > > > > > > > connections, one can choose to select a random partition
> on
> > > > every
> > > > > > > send.
> > > > > > > > > If
> > > > > > > > > > one does have many producers, one can choose to
> > periodically
> > > > > > select a
> > > > > > > > > > random partition. To support this, the partitioner api
> > needs
> > > to
> > > > > be
> > > > > > > > > changed
> > > > > > > > > > though.
> > > > > > > > > >
> > > > > > > > > > Instead of
> > > > > > > > > >   def partition(key: T, numPartitions: Int): Int
> > > > > > > > > >
> > > > > > > > > > we probably need something like the following:
> > > > > > > > > >   def partition(key: T, numPartitions: Int,
> > > > > availablePartitionList:
> > > > > > > > > > List[Int], isNewBatch: boolean, isRefreshMetadata:
> > boolean):
> > > > Int
> > > > > > > > > >
> > > > > > > > > > availablePartitionList: allows us to select only
> partitions
> > > > that
> > > > > > are
> > > > > > > > > > available.
> > > > > > > > > > isNewBatch: allows us to select the same partition for
> all
> > > > > messages
> > > > > > > in
> > > > > > > > a
> > > > > > > > > > given batch in the async mode.
> > > > > > > > > > isRefreshMedatadata: allows us to implement the policy of
> > > > > switching
> > > > > > > to
> > > > > > > > a
> > > > > > > > > > random partition periodically.
> > > > > > > > > >
> > > > > > > > > > This will make the partitioner api a bit more
> complicated.
> > > > > However,
> > > > > > > it
> > > > > > > > > does
> > > > > > > > > > provide enough information for customization.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Sep 18, 2013 at 4:23 PM, Joe Stein <
> > > cryptcom@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Sounds good, I will create a JIRA and upload a patch.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > /*******************************************
> > > > > > > > > > >  Joe Stein
> > > > > > > > > > >  Founder, Principal Consultant
> > > > > > > > > > >  Big Data Open Source Security LLC
> > > > > > > > > > >  http://www.stealth.ly
> > > > > > > > > > >  Twitter: @allthingshadoop
> > > > > > > > > > > ********************************************/
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Sep 17, 2013, at 1:19 PM, Joel Koshy <
> > > jjkoshy.w@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > I agree that minimizing the number of producer
> > > connections
> > > > > > (while
> > > > > > > > > > > > being a good thing) is really required in very large
> > > > > production
> > > > > > > > > > > > deployments, and the net-effect of the existing
> change
> > is
> > > > > > > > > > > > counter-intuitive to users who expect an immediate
> even
> > > > > > > > distribution
> > > > > > > > > > > > across _all_ partitions of the topic.
> > > > > > > > > > > >
> > > > > > > > > > > > However, I don't think it is a hack because it is
> > almost
> > > > > > exactly
> > > > > > > > the
> > > > > > > > > > > > same behavior as 0.7 in one of its modes. The 0.7
> > > producer
> > > > > > > (which I
> > > > > > > > > > > > think was even more confusing) had three modes:
> > > > > > > > > > > > i) ZK send
> > > > > > > > > > > > ii) Config send(a): static list of
> > > > > > > broker1:port1,broker2:port2,etc.
> > > > > > > > > > > > iii) Config send(b): static list of a
> > hardwareVIP:VIPport
> > > > > > > > > > > >
> > > > > > > > > > > > (i) and (ii) would achieve even distribution. (iii)
> > would
> > > > > > > > effectively
> > > > > > > > > > > > select one broker and distribute to partitions on
> that
> > > > broker
> > > > > > > > within
> > > > > > > > > > > > each reconnect interval. (iii) is very similar to
> what
> > we
> > > > now
> > > > > > do
> > > > > > > in
> > > > > > > > > > > > 0.8. (Although we stick to one partition during each
> > > > metadata
> > > > > > > > refresh
> > > > > > > > > > > > interval that can be changed to stick to one broker
> and
> > > > > > > distribute
> > > > > > > > > > > > across partitions on that broker).
> > > > > > > > > > > >
> > > > > > > > > > > > At the same time, I agree with Joe's suggestion that
> we
> > > > > should
> > > > > > > keep
> > > > > > > > > > > > the more intuitive pre-KAFKA-1017 behavior as the
> > default
> > > > and
> > > > > > > move
> > > > > > > > > the
> > > > > > > > > > > > change in KAFKA-1017 to a more specific partitioner
> > > > > > > implementation.
> > > > > > > > > > > >
> > > > > > > > > > > > Joel
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Sun, Sep 15, 2013 at 8:44 AM, Jay Kreps <
> > > > > > jay.kreps@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > >> Let me ask another question which I think is more
> > > > objective.
> > > > > > > Let's
> > > > > > > > > say
> > > > > > > > > > > 100
> > > > > > > > > > > >> random, smart infrastructure specialists try Kafka,
> of
> > > > these
> > > > > > 100
> > > > > > > > how
> > > > > > > > > > > many
> > > > > > > > > > > >> do you believe will
> > > > > > > > > > > >> 1. Say that this behavior is what they expected to
> > > happen?
> > > > > > > > > > > >> 2. Be happy with this behavior?
> > > > > > > > > > > >> I am not being facetious I am genuinely looking for
> a
> > > > > > numerical
> > > > > > > > > > > estimate. I
> > > > > > > > > > > >> am trying to figure out if nobody thought about this
> > or
> > > if
> > > > > my
> > > > > > > > > estimate
> > > > > > > > > > > is
> > > > > > > > > > > >> just really different. For what it is worth my
> > estimate
> > > > is 0
> > > > > > > and 5
> > > > > > > > > > > >> respectively.
> > > > > > > > > > > >>
> > > > > > > > > > > >> This would be fine expect that we changed it from
> the
> > > good
> > > > > > > > behavior
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > >> bad behavior to fix an issue that probably only we
> > have.
> > > > > > > > > > > >>
> > > > > > > > > > > >> -Jay
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Sun, Sep 15, 2013 at 8:37 AM, Jay Kreps <
> > > > > > jay.kreps@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >>> I just took a look at this change. I agree with
> Joe,
> > > not
> > > > to
> > > > > > put
> > > > > > > > to
> > > > > > > > > > > fine a
> > > > > > > > > > > >>> point on it, but this is a confusing hack.
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Jun, I don't think wanting to minimizing the number
> > of
> > > > TCP
> > > > > > > > > > connections
> > > > > > > > > > > is
> > > > > > > > > > > >>> going to be a very common need for people with less
> > > than
> > > > > 10k
> > > > > > > > > > > producers. I
> > > > > > > > > > > >>> also don't think people are going to get very good
> > load
> > > > > > > balancing
> > > > > > > > > out
> > > > > > > > > > > of
> > > > > > > > > > > >>> this because most people don't have a ton of
> > > producers. I
> > > > > > think
> > > > > > > > > > > instead we
> > > > > > > > > > > >>> will spend the next year explaining this behavior
> > which
> > > > 99%
> > > > > > of
> > > > > > > > > people
> > > > > > > > > > > will
> > > > > > > > > > > >>> think is a bug (because it is crazy, non-intuitive,
> > and
> > > > > > breaks
> > > > > > > > > their
> > > > > > > > > > > usage).
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Why was this done by adding special default
> behavior
> > in
> > > > the
> > > > > > > null
> > > > > > > > > key
> > > > > > > > > > > case
> > > > > > > > > > > >>> instead of as a partitioner? The argument that the
> > > > > > partitioner
> > > > > > > > > > > interface
> > > > > > > > > > > >>> doesn't have sufficient information to choose a
> > > partition
> > > > > is
> > > > > > > not
> > > > > > > > a
> > > > > > > > > > good
> > > > > > > > > > > >>> argument for hacking in changes to the default, it
> is
> > > an
> > > > > > > argument
> > > > > > > > > > for *
> > > > > > > > > > > >>> improving* the partitioner interface.
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> The whole point of a partitioner interface is to
> make
> > > it
> > > > > > > possible
> > > > > > > > > to
> > > > > > > > > > > plug
> > > > > > > > > > > >>> in non-standard behavior like this, right?
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> -Jay
> > > > > > > > > > > >>>
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> On Sat, Sep 14, 2013 at 8:15 PM, Jun Rao <
> > > > junrao@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > > >>>
> > > > > > > > > > > >>>> Joe,
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Thanks for bringing this up. I want to clarify
> this
> > a
> > > > bit.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> 1. Currently, the producer side logic is that if
> the
> > > > > > > > partitioning
> > > > > > > > > > key
> > > > > > > > > > > is
> > > > > > > > > > > >>>> not provided (i.e., it is null), the partitioner
> > won't
> > > > be
> > > > > > > > called.
> > > > > > > > > We
> > > > > > > > > > > did
> > > > > > > > > > > >>>> that because we want to select a random and
> > > "available"
> > > > > > > > partition
> > > > > > > > > to
> > > > > > > > > > > send
> > > > > > > > > > > >>>> messages so that if some partitions are
> temporarily
> > > > > > > unavailable
> > > > > > > > > > > (because
> > > > > > > > > > > >>>> of
> > > > > > > > > > > >>>> broker failures), messages can still be sent to
> > other
> > > > > > > > partitions.
> > > > > > > > > > > Doing
> > > > > > > > > > > >>>> this in the partitioner is difficult since the
> > > > partitioner
> > > > > > > > doesn't
> > > > > > > > > > > know
> > > > > > > > > > > >>>> which partitions are currently available (the
> > > > > > > > DefaultEventHandler
> > > > > > > > > > > does).
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> 2. As Joel said, the common use case in production
> > is
> > > > that
> > > > > > > there
> > > > > > > > > are
> > > > > > > > > > > many
> > > > > > > > > > > >>>> more producers than #partitions in a topic. In
> this
> > > > case,
> > > > > > > > sticking
> > > > > > > > > > to
> > > > > > > > > > > a
> > > > > > > > > > > >>>> partition for a few minutes is not going to cause
> > too
> > > > much
> > > > > > > > > imbalance
> > > > > > > > > > > in
> > > > > > > > > > > >>>> the
> > > > > > > > > > > >>>> partitions and has the benefit of reducing the #
> of
> > > > socket
> > > > > > > > > > > connections. My
> > > > > > > > > > > >>>> feeling is that this will benefit most production
> > > users.
> > > > > In
> > > > > > > > fact,
> > > > > > > > > if
> > > > > > > > > > > one
> > > > > > > > > > > >>>> uses a hardware load balancer for producing data
> in
> > > 0.7,
> > > > > it
> > > > > > > > > behaves
> > > > > > > > > > in
> > > > > > > > > > > >>>> exactly the same way (a producer will stick to a
> > > broker
> > > > > > until
> > > > > > > > the
> > > > > > > > > > > >>>> reconnect
> > > > > > > > > > > >>>> interval is reached).
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> 3. It is true that If one is testing a topic with
> > more
> > > > > than
> > > > > > > one
> > > > > > > > > > > partition
> > > > > > > > > > > >>>> (which is not the default value), this behavior
> can
> > > be a
> > > > > bit
> > > > > > > > > weird.
> > > > > > > > > > > >>>> However, I think it can be mitigated by running
> > > multiple
> > > > > > test
> > > > > > > > > > producer
> > > > > > > > > > > >>>> instances.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> 4. Someone reported in the mailing list that all
> > data
> > > > > shows
> > > > > > in
> > > > > > > > > only
> > > > > > > > > > > one
> > > > > > > > > > > >>>> partition after a few weeks. This is clearly not
> the
> > > > > > expected
> > > > > > > > > > > behavior. We
> > > > > > > > > > > >>>> can take a closer look to see if this is real
> issue.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Do you think these address your concerns?
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Thanks,
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Jun
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> On Sat, Sep 14, 2013 at 11:18 AM, Joe Stein <
> > > > > > > cryptcom@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>> How about creating a new class called
> > > > > > RandomRefreshPartioner
> > > > > > > > and
> > > > > > > > > > copy
> > > > > > > > > > > >>>> the
> > > > > > > > > > > >>>>> DefaultPartitioner code to it and then revert the
> > > > > > > > > > DefaultPartitioner
> > > > > > > > > > > >>>> code.
> > > > > > > > > > > >>>>> I appreciate this is a one time burden for folks
> > > using
> > > > > the
> > > > > > > > > existing
> > > > > > > > > > > >>>>> 0.8-beta1 bumping into KAFKA-1017 in production
> > > having
> > > > to
> > > > > > > > switch
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > >>>>> RandomRefreshPartioner and when folks deploy to
> > > > > production
> > > > > > > will
> > > > > > > > > > have
> > > > > > > > > > > to
> > > > > > > > > > > >>>>> consider this property change.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> I make this suggestion keeping in mind the new
> > folks
> > > > that
> > > > > > on
> > > > > > > > > board
> > > > > > > > > > > with
> > > > > > > > > > > >>>>> Kafka and when everyone is in development and
> > testing
> > > > > mode
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > > > first
> > > > > > > > > > > >>>>> time their experience would be as expected from
> how
> > > it
> > > > > > would
> > > > > > > > work
> > > > > > > > > > in
> > > > > > > > > > > >>>>> production this way.  In dev/test when first
> using
> > > > Kafka
> > > > > > they
> > > > > > > > > won't
> > > > > > > > > > > >>>> have so
> > > > > > > > > > > >>>>> many producers for partitions but would look to
> > > > > parallelize
> > > > > > > > their
> > > > > > > > > > > >>>> consumers
> > > > > > > > > > > >>>>> IMHO.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> The random broker change sounds like maybe a
> bigger
> > > > > change
> > > > > > > now
> > > > > > > > > this
> > > > > > > > > > > late
> > > > > > > > > > > >>>>> in the release cycle if we can accommodate folks
> > > trying
> > > > > > Kafka
> > > > > > > > for
> > > > > > > > > > the
> > > > > > > > > > > >>>> first
> > > > > > > > > > > >>>>> time and through their development and testing
> > along
> > > > with
> > > > > > > full
> > > > > > > > > > blown
> > > > > > > > > > > >>>>> production deploys.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> /*******************************************
> > > > > > > > > > > >>>>> Joe Stein
> > > > > > > > > > > >>>>> Founder, Principal Consultant
> > > > > > > > > > > >>>>> Big Data Open Source Security LLC
> > > > > > > > > > > >>>>> http://www.stealth.ly
> > > > > > > > > > > >>>>> Twitter: @allthingshadoop
> > > > > > > > > > > >>>>> ********************************************/
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> On Sep 14, 2013, at 8:17 AM, Joel Koshy <
> > > > > > jjkoshy.w@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Thanks for bringing this up - it is definitely
> an
> > > > > > important
> > > > > > > > > point
> > > > > > > > > > > to
> > > > > > > > > > > >>>>>>> discuss. The underlying issue of KAFKA-1017 was
> > > > > uncovered
> > > > > > > to
> > > > > > > > > some
> > > > > > > > > > > >>>>> degree by
> > > > > > > > > > > >>>>>>> the fact that in our deployment we did not
> > > > > significantly
> > > > > > > > > increase
> > > > > > > > > > > the
> > > > > > > > > > > >>>>> total
> > > > > > > > > > > >>>>>>> number of partitions over 0.7 - i.e., in 0.7 we
> > had
> > > > say
> > > > > > > four
> > > > > > > > > > > >>>> partitions
> > > > > > > > > > > >>>>> per
> > > > > > > > > > > >>>>>>> broker, now we are using (say) eight partitions
> > > > across
> > > > > > the
> > > > > > > > > > cluster.
> > > > > > > > > > > >>>> So
> > > > > > > > > > > >>>>> with
> > > > > > > > > > > >>>>>>> random partitioning every producer would end up
> > > > > > connecting
> > > > > > > to
> > > > > > > > > > > nearly
> > > > > > > > > > > >>>>> every
> > > > > > > > > > > >>>>>>> broker (unlike 0.7 in which we would connect to
> > > only
> > > > > one
> > > > > > > > broker
> > > > > > > > > > > >>>> within
> > > > > > > > > > > >>>>> each
> > > > > > > > > > > >>>>>>> reconnect interval). In a production-scale
> > > deployment
> > > > > > that
> > > > > > > > > causes
> > > > > > > > > > > the
> > > > > > > > > > > >>>>> high
> > > > > > > > > > > >>>>>>> number of connections that KAFKA-1017
> addresses.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> You are right that the fix of sticking to one
> > > > partition
> > > > > > > over
> > > > > > > > > the
> > > > > > > > > > > >>>>> metadata
> > > > > > > > > > > >>>>>>> refresh interval goes against true consumer
> > > > > parallelism,
> > > > > > > but
> > > > > > > > > this
> > > > > > > > > > > >>>> would
> > > > > > > > > > > >>>>> be
> > > > > > > > > > > >>>>>>> the case only if there are few producers. If
> you
> > > > have a
> > > > > > > > sizable
> > > > > > > > > > > >>>> number
> > > > > > > > > > > >>>>> of
> > > > > > > > > > > >>>>>>> producers on average all partitions would get
> > > uniform
> > > > > > > volumes
> > > > > > > > > of
> > > > > > > > > > > >>>> data.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> One tweak to KAFKA-1017 that I think is
> > reasonable
> > > > > would
> > > > > > be
> > > > > > > > > > instead
> > > > > > > > > > > >>>> of
> > > > > > > > > > > >>>>>>> sticking to a random partition, stick to a
> random
> > > > > broker
> > > > > > > and
> > > > > > > > > send
> > > > > > > > > > > to
> > > > > > > > > > > >>>>> random
> > > > > > > > > > > >>>>>>> partitions within that broker. This would make
> > the
> > > > > > behavior
> > > > > > > > > > closer
> > > > > > > > > > > to
> > > > > > > > > > > >>>>> 0.7
> > > > > > > > > > > >>>>>>> wrt number of connections and random
> partitioning
> > > > > > provided
> > > > > > > > the
> > > > > > > > > > > >>>> number of
> > > > > > > > > > > >>>>>>> partitions per broker is high enough, which is
> > why
> > > I
> > > > > > > > mentioned
> > > > > > > > > > the
> > > > > > > > > > > >>>>>>> partition count (in our usage) in 0.7 vs 0.8
> > above.
> > > > > > > Thoughts?
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Joel
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> On Friday, September 13, 2013, Joe Stein wrote:
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> First, let me apologize for not
> > realizing/noticing
> > > > > this
> > > > > > > > until
> > > > > > > > > > > today.
> > > > > > > > > > > >>>>> One
> > > > > > > > > > > >>>>>>>> reason I left my last company was not being
> paid
> > > to
> > > > > work
> > > > > > > on
> > > > > > > > > > Kafka
> > > > > > > > > > > >>>> nor
> > > > > > > > > > > >>>>>>> being
> > > > > > > > > > > >>>>>>> able to afford any time for a while to work on
> > it.
> > > > Now
> > > > > in
> > > > > > > my
> > > > > > > > > new
> > > > > > > > > > > gig
> > > > > > > > > > > >>>>> (just
> > > > > > > > > > > >>>>>>> wrapped up my first week, woo hoo) while I am
> > still
> > > > not
> > > > > > > "paid
> > > > > > > > > to
> > > > > > > > > > > >>>> work on
> > > > > > > > > > > >>>>>>> Kafka" I can afford some more time for it now
> and
> > > > maybe
> > > > > > in
> > > > > > > 6
> > > > > > > > > > > months I
> > > > > > > > > > > >>>>> will
> > > > > > > > > > > >>>>>>> be able to hire folks to work on Kafka (with
> more
> > > and
> > > > > > more
> > > > > > > > time
> > > > > > > > > > for
> > > > > > > > > > > >>>>> myself
> > > > > > > > > > > >>>>>>> to work on it too) while we also work on client
> > > > > projects
> > > > > > > > > > > (especially
> > > > > > > > > > > >>>>> Kafka
> > > > > > > > > > > >>>>>>> based ones).
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> So, I understand about the changes that were
> made
> > > to
> > > > > fix
> > > > > > > open
> > > > > > > > > > file
> > > > > > > > > > > >>>>> handles
> > > > > > > > > > > >>>>>>> and make the random pinning be timed based
> (with
> > a
> > > > very
> > > > > > > large
> > > > > > > > > > > default
> > > > > > > > > > > >>>>>>> time).  Got all that.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> But, doesn't this completely negate what has
> been
> > > > > > > > communicated
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > >>>>>>> community for a very long time and the
> > expectation
> > > > they
> > > > > > > > have? I
> > > > > > > > > > > >>>> think it
> > > > > > > > > > > >>>>>>> does.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> The expected functionality for random
> > partitioning
> > > is
> > > > > > that
> > > > > > > > > "This
> > > > > > > > > > > can
> > > > > > > > > > > >>>> be
> > > > > > > > > > > >>>>>>> done in a round-robin fashion simply to balance
> > > load"
> > > > > and
> > > > > > > > that
> > > > > > > > > > the
> > > > > > > > > > > >>>>>>> "producer" does it for you.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Isn't a primary use case for partitions to
> > paralyze
> > > > > > > > consumers?
> > > > > > > > > If
> > > > > > > > > > > so
> > > > > > > > > > > >>>>> then
> > > > > > > > > > > >>>>>>> the expectation would be that all consumers
> would
> > > be
> > > > > > > getting
> > > > > > > > in
> > > > > > > > > > > >>>> parallel
> > > > > > > > > > > >>>>>>> equally in a "round robin fashion" the data
> that
> > > was
> > > > > > > produced
> > > > > > > > > for
> > > > > > > > > > > the
> > > > > > > > > > > >>>>>>> topic... simply to balance load...with the
> > producer
> > > > > > > handling
> > > > > > > > it
> > > > > > > > > > and
> > > > > > > > > > > >>>> with
> > > > > > > > > > > >>>>>>> the client application not having to-do
> anything.
> > > > This
> > > > > > > > > randomness
> > > > > > > > > > > >>>>> occurring
> > > > > > > > > > > >>>>>>> every 10 minutes can't balance load.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> If users are going to work around this anyways
> > (as
> > > I
> > > > > > would
> > > > > > > > > > honestly
> > > > > > > > > > > >>>> do
> > > > > > > > > > > >>>>> too)
> > > > > > > > > > > >>>>>>> doing a pseudo semantic random key and
> > essentially
> > > > > > forcing
> > > > > > > > real
> > > > > > > > > > > >>>>> randomness
> > > > > > > > > > > >>>>>>> to simply balance load to my consumers running
> in
> > > > > > parallel
> > > > > > > > > would
> > > > > > > > > > we
> > > > > > > > > > > >>>>> still
> > > > > > > > > > > >>>>>>> end up hitting the KAFKA-1017 problem anyways?
> If
> > > not
> > > > > > then
> > > > > > > > why
> > > > > > > > > > > can't
> > > > > > > > > > > >>>> we
> > > > > > > > > > > >>>>>>> just give users the functionality and put back
> > the
> > > 3
> > > > > > lines
> > > > > > > of
> > > > > > > > > > code
> > > > > > > > > > > 1)
> > > > > > > > > > > >>>>>>> if(key == null) 2)
>  random.nextInt(numPartitions)
> > > 3)
> > > > > else
> > > > > > > ...
> > > > > > > > > If
> > > > > > > > > > we
> > > > > > > > > > > >>>>> would
> > > > > > > > > > > >>>>>>> bump into KAFKA-1017 by working around it then
> we
> > > > have
> > > > > > not
> > > > > > > > > really
> > > > > > > > > > > >>>> solved
> > > > > > > > > > > >>>>>>> the root cause problem and removing expected
> > > > > > functionality
> > > > > > > > for
> > > > > > > > > a
> > > > > > > > > > > >>>> corner
> > > > > > > > > > > >>>>>>> case that might have other work arounds and/or
> > code
> > > > > > changes
> > > > > > > > to
> > > > > > > > > > > solve
> > > > > > > > > > > >>>> it
> > > > > > > > > > > >>>>>>> another way or am I still not getting
> something?
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Also, I was looking at testRandomPartitioner in
> > > > > > > > > AsyncProducerTest
> > > > > > > > > > > >>>> and I
> > > > > > > > > > > >>>>>>> don't see how this would ever fail, the
> assertion
> > > is
> > > > > > always
> > > > > > > > for
> > > > > > > > > > > >>>>> partitionId
> > > > > > > > > > > >>>>>>> == 0 and it should be checking that data is
> going
> > > to
> > > > > > > > different
> > > > > > > > > > > >>>>> partitions
> > > > > > > > > > > >>>>>>> for a topic, right?
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Let me know, I think this is an important
> > > discussion
> > > > > and
> > > > > > > even
> > > > > > > > > if
> > > > > > > > > > it
> > > > > > > > > > > >>>>> ends up
> > > > > > > > > > > >>>>>>> as telling the community to only use one
> > partition
> > > > that
> > > > > > is
> > > > > > > > all
> > > > > > > > > > you
> > > > > > > > > > > >>>> need
> > > > > > > > > > > >>>>> and
> > > > > > > > > > > >>>>>>> partitions become our super columns (Apache
> > > Cassandra
> > > > > > joke,
> > > > > > > > its
> > > > > > > > > > > >>>> funny)
> > > > > > > > > > > >>>>> then
> > > > > > > > > > > >>>>>>> we manage and support it and that is just how
> it
> > is
> > > > but
> > > > > > if
> > > > > > > > > > > partitions
> > > > > > > > > > > >>>>> are a
> > > > > > > > > > > >>>>>>> good thing and having multiple consumers scale
> in
> > > > > > parrelel
> > > > > > > > for
> > > > > > > > > a
> > > > > > > > > > > >>>> single
> > > > > > > > > > > >>>>>>> topic also good then we have to manage and
> > support
> > > > > that.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> /*******************************************
> > > > > > > > > > > >>>>>>> Joe Stein
> > > > > > > > > > > >>>>>>> Founder, Principal Consultant
> > > > > > > > > > > >>>>>>> Big Data Open Source Security LLC
> > > > > > > > > > > >>>>>>> http://www.stealth.ly
> > > > > > > > > > > >>>>>>> Twitter: @allthingshadoop <
> > > > > > > > > > http://www.twitter.com/allthingshadoop>
> > > > > > > > > > > >>>>>>> ********************************************/
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>
> > > > > > > > > > > >>>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message