kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Colin McCabe <cmcc...@apache.org>
Subject Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient
Date Tue, 22 Aug 2017 22:49:20 GMT
On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote:
> Hi Dong and Jun,
> 
> Thanks again for your input in this discussion and on KIP-113. It's
> difficult that discussion is split between this thread and the one for
> KIP-113, but I'll try to respond on this thread to questions asked on
> this
> thread.
> 
> It seems there is some consensus that the alterTopic() API is the wrong
> thing, and it would make more sense to separate the different kinds of
> alteration into separate APIs. It seems to me there is then a choice.
> 
> 1. Have separate alterPartitionCount(), alterReplicationFactor() and
> reassignPartitions() methods. This would most closely match the
> facilities
> currently offered by kafka-alter-topics and kafka-reassign-partitions.
> 2. Just provide a reassignPartitions() method and infer from the shape of
> the data passed to that that a change in replication factor and/or
> partition count is required, as suggested by Dong.
> 
> The choice we make here is also relevant to KIP-113 and KIP-178. By
> choosing (1) we can change the replication factor or partition count
> without providing an assignment and therefore are necessarily requiring
> the
> controller to make a decision for us about which broker (and, for 113,
> which log directory and thus which disk) the replica should be reside.
> There is then the matter of what criteria the controller should use to
> make
> that decision (a decision is also required on topic creation of course,
> but
> I'll try not to go there right now).

Hmm.  One approach would be to have something like

> TopicAlterationResult alterTopics(TopicAlterationOptions options,
>                                   Map<String, TopicAlteration> alters)
>
> PartitionCountAlteration(int numPartitions) implements TopicAlteration
>
> ReplicationFactorAlteration(int repl) implements TopicAlteration
>
> ReassignPartitionsAlteration(...) implements TopicAlteration


> 
> Choosing (2), on the other hand, forces us to make an assignment, and
> currently the AdminClient doesn't provide the APIs necessary to make a
> very
> informed decision. To do the job properly we'd need APIs to enumerate the
> permissible log directories on each broker, know the current disk usage
> etc. These just don't exist today, and I think it would be a lot of work
> to
> bite off to specify them. We've already got three KIPs on the go.
> 
> So, for the moment, I think we have to choose 1, for pragmatic reasons.
> There is nothing to stop us deprecating alterPartitionCount() and
> alterReplicationFactor() and moving to (2) at a later date.

Yeah, the API should be pragmatic.  An API that doesn't let us do what
we want to do (like let Kafka assign us a new replica on the least
loaded node) is not good.

> 
> To answer a specific questions of Jun's:
> 
> 3. It's not very clear to me what status_time in ReplicaStatusResponse
> > means.
> 
> 
> It corresponds to the ReplicaStatus.statusTime, and the idea was it was
> the
> epoch offset at which the controllers notion of the lag was current
> (which
> I think is the epoch offset of the last FetchRequest from the follower).
> At
> one point I was considering some other ways of determining progress and
> completion, and TBH I think it was more pertinent to those rejected
> alternatives.
> 
> I've already made some changes to KIP-179. As suggested in the thread for
> KIP-113, I will be changing some more since I think we can
> DescribeDirsRequest
> instead of ReplicaStatusResponse to implement the --progress option.

That makes sense.  Whatever response is used, it would be nice to have
an error message in addition to an error code.

best,
Colin

> 
> Cheers,
> 
> Tom
> 
> 
> 
> On 9 August 2017 at 02:28, Jun Rao <jun@confluent.io> wrote:
> 
> > Hi, Tom,
> >
> > Thanks for the KIP. A few minor comments below.
> >
> > 1. Implementation wise, the broker handles adding partitions differently
> > from changing replica assignment. For the former, we directly update the
> > topic path in ZK with the new partitions. For the latter, we write the new
> > partition reassignment in the partition reassignment path. Changing the
> > replication factor is handled in the same way as changing replica
> > assignment. So, it would be useful to document how the broker handles these
> > different cases accordingly. I think it's simpler to just allow one change
> > (partition, replication factor, or replica assignment) in a request.
> >
> > 2. Currently, we only allow adding partitions. We probably want to document
> > the restriction in the api.
> >
> > 3. It's not very clear to me what status_time in ReplicaStatusResponse
> > means.
> >
> > Jun
> >
> >
> >
> > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <lindong28@gmail.com> wrote:
> >
> > > Hey Tom,
> > >
> > > Thanks for your reply. Here are my thoughts:
> > >
> > > 1) I think the DescribeDirsResponse can be used by AdminClient to query
> > the
> > > lag of follower replica as well. Here is how it works:
> > >
> > > - AdminClient sends DescribeDirsRequest to both the leader and the
> > follower
> > > of the partition.
> > > - DescribeDirsResponse from both leader and follower shows the LEO of the
> > > leader replica and the follower replica.
> > > - Lag of the follower replica is the difference in the LEO between leader
> > > and follower.
> > >
> > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to be
> > send
> > > to each replica of the partition whereas ReplicaStatusRequest only needs
> > to
> > > be sent to the leader of the partition. It doesn't seem to make much
> > > difference though. In practice we probably want to query the replica lag
> > of
> > > many partitions and AminClient needs to send exactly one request to each
> > > broker with either solution. Does this make sense?
> > >
> > >
> > > 2) KIP-179 proposes to add the following AdminClient API:
> > >
> > > alterTopics(Collection<AlteredTopic> alteredTopics, AlterTopicsOptions
> > > options)
> > >
> > > Where AlteredTopic includes the following fields
> > >
> > > AlteredTopic(String name, int numPartitions, int replicationFactor,
> > > Map<Integer,List<Integer>> replicasAssignment)
> > >
> > > I have two comments on this:
> > >
> > > - The information in AlteredTopic seems a bit redundant. Both
> > numPartitions
> > > and replicationFactor can be derived from replicasAssignment. I think we
> > > can probably just use the following API in AdminClient instead:
> > >
> > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
> > > partitionAssignment, AlterTopicsOptions options)
> > >
> > > - Do you think "reassignPartitions" may be a better name than
> > > "alterTopics"? This is more consistent with the existing name used in
> > > kafka-reassign-partitions.sh and I also find it to be more accurate. On
> > the
> > > other hand, alterTopics seems to suggest that we can also alter topic
> > > config.
> > >
> > >
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t.j.bentley@gmail.com>
> > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks for your reply.
> > > >
> > > > You're right that your DescribeDirsResponse contains appropriate data.
> > > The
> > > > comment about the log_end_offset in the KIP says "Enable user to track
> > > > movement progress by comparing LEO of the *.log and *.move". That makes
> > > me
> > > > wonder whether this would only work for replica movement on the same
> > > > broker. In the case of reassigning partitions between brokers it's only
> > > > really the leader for the partition that knows the max LEO of the ISR
> > and
> > > > the LEO of the target broker. Maybe the comment is misleading and it
> > > would
> > > > be the leader doing the same thing in both cases. Can you clarify this?
> > >
> > >
> > > > At a conceptual level there's a lot of similarity between KIP-113 and
> > > > KIP-179 because they're both about moving replicas around. Concretely
> > > > though, ChangeReplicaDirRequest assumes the movement is on the same
> > > broker,
> > > > while the equivalent APIs in KIP-179 (whichever alternative) lack the
> > > > notion of disks. I wonder if a unified API would be better or not.
> > > > Operationally, the reasons for changing replicas between brokers are
> > > likely
> > > > to be motivated by balancing load across the cluster, or
> > adding/removing
> > > > brokers. But the JBOD use case has different motivations. So I suspect
> > > it's
> > > > OK that the APIs are separate, but I'd love to hear what others think.
> > >
> > >
> > > > I think you're right about TopicPartitionReplica. At the protocol level
> > > we
> > > > could group topics and partitions together so avoid having the same
> > topic
> > > > name multiple times when querying for the status of all the partitions
> > > of a
> > > > topic.
> > > >
> > > > Thanks again for taking the time to respond.
> > > >
> > > > Tom
> > > >
> > > > On 3 August 2017 at 23:07, Dong Lin <lindong28@gmail.com> wrote:
> > > >
> > > > > Hey Tom,
> > > > >
> > > > > Thanks for the KIP. It seems that the prior discussion in this thread
> > > has
> > > > > focused on reassigning partitions (or AlterTopics). I haven't looked
> > > into
> > > > > this yet. I have two comments on the replicaStatus() API and the
> > > > > ReplicaStatusRequest:
> > > > >
> > > > > -  It seems that the use-case for ReplicaStatusRequest is covered
by
> > > > > the DescribeDirsRequest introduced in KIP-113
> > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 113%3A+Support+replicas+movement+between+log+directories>.
> > > > > DescribeDirsResponse contains the logEndOffset for the specified
> > > > > partitions. The the lag of the follower replica can be derived as
the
> > > > > difference between leader's LEO and follower's LEO. Do you think
we
> > can
> > > > > simply use DescribeDirsRequest without introducing
> > > ReplicaStatusRequest?
> > > > >
> > > > > - According to the current KIP, replicaStatus() takes a list of
> > > > partitions
> > > > > as input. And its output maps the partition to a list of replica
> > > status.
> > > > Do
> > > > > you think it would be better to have a new class called
> > > > > TopicPartitionReplica, which identifies the topic, partition and
the
> > > > > brokerId. replicaStatus can then take a list of TopicPartitionReplica
> > > as
> > > > > input. And its output maps the replica to replica status. The latter
> > > API
> > > > > seems simpler and also matches the method name better. What do you
> > > think?
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <t.j.bentley@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi again Ismael,
> > > > > >
> > > > > >
> > > > > > 1. It's worth emphasising that reassigning partitions is a
> > different
> > > > > > >> process than what happens when a topic is created,
so not sure
> > > > trying
> > > > > to
> > > > > > >> make it symmetric is beneficial. In addition to what
was already
> > > > > > >> discussed,
> > > > > > >> one should also enable replication throttling before
moving the
> > > > data.
> > > > > > >
> > > > > > >
> > > > > > > Thanks for your responses.The only advantage I can see
from
> > having
> > > > > > > separate APIs for partition count change vs. the other
changes is
> > > > that
> > > > > we
> > > > > > > expect the former to return synchronously, and the latter
to
> > > > (usually)
> > > > > > > return asynchronously. Lumping them into the same API forces
> > > clients
> > > > of
> > > > > > > that API to code for the two possible cases. Is this the
> > particular
> > > > > > concern
> > > > > > > you had in mind, or do you have others?
> > > > > > >
> > > > > >
> > > > > > Just to help see what this looks like I've created another version
> > of
> > > > > > KIP-179 (
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+AdminClient
> > > > > > ).
> > > > > >
> > > > > > The AdminClient APIs are more type safe (harder to misuse).
The
> > > > > > asynchronous nature of the alterReplicationFactors() and
> > > > > > reassignPartitions() methods aren't really apparent (you'd still
> > have
> > > > to
> > > > > > read the javadocs to know you had to call replicaStatus() to
> > > determine
> > > > > > completion), but it's still better that the lumped-together
API
> > > because
> > > > > the
> > > > > > methods are _consistently_ async. The Protocol APIs are slightly
> > > nicer
> > > > > too,
> > > > > > in that INVALID_REQUEST is less commonly returned. Another benefit
> > is
> > > > it
> > > > > > would allow for these different alteration APIs to be modified
> > > > > > independently in the future, should that necessary.
> > > > > >
> > > > > > What do others think?
> > > > > >
> > > > > > You're right that the proposal doesn't cover setting and clearing
a
> > > > > > > throttle, and it should. I will study the code about how
this
> > works
> > > > > > before
> > > > > > > posting back about that.
> > > > > > >
> > > > > >
> > > > > > AFAICS from the code the throttle is simply a dynamic config
of the
> > > > > broker.
> > > > > > We already have the AdminClient.alterConfigs API for setting
this,
> > so
> > > > the
> > > > > > refactoring of kafka-reassign-partitions.sh could just use that
> > > > existing
> > > > > > API. We could still achieve your objective of setting the throttle
> > > > before
> > > > > > reassignment by making the --throttle option mandatory when
> > --execute
> > > > was
> > > > > > present. Or did you have something else in mind?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Tom
> > > > > >
> > > > >
> > > >
> > >
> >

Mime
View raw message