kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stanislav Kozlovski <stanis...@confluent.io>
Subject Re: [DISCUSS] KIP-455 Create an Admin API for Replica Reassignments
Date Mon, 08 Jul 2019 11:48:30 GMT
Hey Colin,

I've got A couple more minor questions:

AlterPartitionAssignmentsRequest: Should we make the `Topics` and
`Partitions` field nullable as well, so we can appropriately cancel
pending reassignments at all levels of granularity? I think this will
be consistent with the rest of the request and should not be too hard
to implement.

>From the API spec, I understand we want to never fail the whole
`AlterPartitionAssignmentsRequest` if one partition is given an
invalid assignment. This is consistent with the current behavior from
the zNode.
Do we want to add more detail to the `AlterPartitionReassignmentsResponse`?
Currently, we have:
```
 { "name": "Responses", "type": "[]ReassignablePartitionResponse",
"versions": "0+",
      "about": "The responses to partitions to reassign.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code." },
      { "name": "ErrorString", "type": "string", "versions": "0+",
"nullableVersions": "0+",
        "about": "The error string, or null if there was no error." }
      ]
    }
```
I was thinking we should add fields to denote topic/partitions:
```
{ "name": "Responses", "type": "[]ReassignableTopicResponse", "versions": "0+",
      "about": "The responses to topics to reassign.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
"entityType": "topicName",
          "about": "The topic name" },
        { "name": "Partitions", "type":
"[]ReassignablePartitionResponse", "versions": "0+",
          "about": "The responses to partitions to reassign.", "fields": [
            { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
            { "name": "ErrorCode", "type": "int16", "versions": "0+",
              "about": "The error code." },
            { "name": "ErrorString", "type": "string", "versions":
"0+", "nullableVersions": "0+",
              "about": "The error string, or null if there was no error." }
          ]
        }
      ]
    }
```
to better structure the reassigned partitions and their particular failures.


For the `ListPartitionReassignmentsRequest`, we only have a top-level
error, meaning we would fail the whole request if even one partition
ID or topic name is invalid. Is it worth adding a more fine-grained
error level? I assume it’s not worth it since we would have a
bigger-in-size response and it should be pretty rare to provide an
invalid topic-name or partition index.


To continue the discussion:

> I thought about this a bit more, and I think the easiest way forward is just to make
the shell script incremental.

+1. This was going to be my suggestion as well. I believe this is more
intuitive to the user and easier on our side

> I guess we could have such a flag, but it can be implemented on the command-line side,
not the broker side.

Yes, I propose a "--cancel-all" top-level flag to the tool

> It seems like a fairly unlikely case.  I don't think I feel that strongly about what
we should do in this scenario.

It truly is a very unlikely case as it means somebody is using both
the old and new APIs, which should be rare in practice. We could keep
the `reassign_partitions` up to date with any new API replica
reassignments which were present in the node to begin with, but that
might incur a performance hit which may not be worth the tradeoff,
given the scenario it guards against is very unlikely.
I would suggest we consciously make the design decision that we accept
the possible inconsistency in this case in favor of better performance
and simpler code. What do people think about this?

> The more likely case is that a znode-initiated reassignment for a partition is partly
done and the controller failover happens.  That case will be handled, though, by the same
logic which makes a new reassignment to the current active assignment a no-op.

Could you expand on this? By partly-done, are you referring to the
process of updating the `/partitions/${part}/state` zNodes in
accordance to the `reassign_partitions` zNode?
If so, yes that will be handle, but a new reassignment to the current
active assignment won't be a no-op, I think. My understanding was that
when the controller starts up, it will read the `reassign_partitions`
znode and apply it to the `/states` znode. Therefore, a new
reassignment to the current active assignment will work (it won't be a
no-op) if the controller gets restarted. Is this correct?

> I agree that there is kind of an awkward edge case right when a targetReplica enters
the ISR.  Since the replica set and the ISR reside in two separate znodes, they can't be updated
atomically (well, maybe they could, but I don't think we will in practice).  When the controller
becomes aware that the ISR has changed for the reassigning partition, it will remove the relevant
partition from targetReplicas and add it to replicas.
> This is another ZK write, of course.  I don't think this is really a problem in practice
because when we're creating the client metadata message, we can simply enforce that anything
in the ISR must be in the replica set and not in the targetReplica set.

I agree. Just to be extra clear, are we planning on filtering the ISR
against the replica set when constructing the metadata message? e.g if
ISR=[1,2,3,4], Replicas=[1,2,3] - the metadata response will contain
[1,2,3] for both collections.


Let me know what you think and I can update the KIP, so you can focus
on other work.

Best,
Stanislav

On Thu, Jul 4, 2019 at 12:20 AM Colin McCabe <cmccabe@apache.org> wrote:
>
> On Wed, Jul 3, 2019, at 11:22, Stanislav Kozlovski wrote:
> > Hey Colin,
> >
> > Thanks for the detailed reply!
> >
> > > Not really.  Cruise Control, for example, tries to update the znode even when
it's not empty.  This doesn't work, of course.  This is an existing problem with the old API--
which the new API fixes, of course.
> >
> > I am not sure that's the case - I think they don't allow it:
> > https://github.com/linkedin/cruise-control/master/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java#L417
> > Regardless, I agree with your statement that we aren't changing the
> > expected behavior - updating the znode while a reassignment is in
> > progress has no effect now and will continue to and that there's no
> > guarantee that what is in the znode reflects the current reassignments
> > that are going on.
> >
> > > Bob thought we might need a flag to switch between them, but this is not the
case.  Such a flag would add a huge amount of complexity since we'd have to have totally separate
code paths, totally separate ways of storing the metadata, etc. etc.
> >
> > Given your clarifications from the reply, I now have a better
> > understanding of the interaction and agree that supporting both is not
> > as hard as I originally thought
> >
>
> Sounds good.
>
> > > By default, when applying a reassignment plan JSON, the command-line tool will
attempt to cancel any reassignment that isn't part of the plan.
> >
> > I assume this will be implemented by listing all ongoing
> > reassignments, filtering the ones not part of the plan and canceling
> > them. Since we want to have this in our default tool, chances are
> > other tools might want to do something similar. What are your thoughts
> > on moving this logic to the broker? e.g providing this in the API via
> > a "isIncremental" flag or something similar.
> > Some upsides I see is that it might be easier to audit (the broker
> > could log something relevant), easier to implement by tools and may be
> > less prone to race conditions in cases where concurrent requests are
> > mixed up. Although I don't have much experience with protocol design
> > so am curious whether I'm missing something
>
> I thought about this a bit more, and I think the easiest way forward is just to make
the shell script incremental.
>
> In the past, non-incremental APIs have created big problems for users.  One example is
AlterConfigs.  It was non-incremental, and this led to people attempting inefficient read-modify-write
cycles to change single configuration values.  This inefficiency would be a problem for us
here too -- we'd have to re-specify existing reassignments when using a non-incremental API.
 As the number of reassignments grows, that will get more and more inefficient.
>
> Another issue with AlterConfigs is that when users read the existing config, they didn't
always get all of it, if they didn't have permission to see it (like with passwords).  So
the configs were actually lost during the read-modify-write cycle.  This seems like it would
be a problem in any case where you didn't have permission to list everything.  That isn't
the case with the proposed reassignment API currently, but if we ever added more than one
security level, it might be.
>
> Another issue is just collisions between different users using non-incremental APIs.
 I think on the whole it's better just not to have a non-incremental API.
>
> >
> > > Users can cancel all ongoing reassignments by providing an empty JSON file
to the reassignments tool.  You're right that the KIP didn't fully spell out the changes to
the reassignment tool, though, so let me add that.
> >
> > How do you feel about adding a new flag to the .sh tool for explicitly
> > canceling all ongoing reassignments? I think that might be more user
> > friendly and easier to document than the behavior of providing an
> > empty JSON (which could still continue to work as a way to cancel
> > reassignments)
>
> I guess we could have such a flag, but it can be implemented on the command-line side,
not the broker side.
>
> >
> > > When starting up, if the znode contains a reassignment, the controller will
try to apply it.  This is a bit messy, but better than the alternatives, I think.
> >
> > I think we want to handle the case where:
> > 1. zNode reassignment involving partitionA and partitionB starts
> > 2. API call reassigns partitionA
> > 3. controller failover, should move partitionA to the latest given
> > reassignment and partitionB to the original one
> > I believe we could guarantee this from the server by checking whether
> > the zNode reassignments contains any given partitions and removing
> > them from it if that is the case.
> > Do you think that makes sense?
>
> It seems like a fairly unlikely case.  I don't think I feel that strongly about what
we should do in this scenario.
>
> The more likely case is that a znode-initiated reassignment for a partition is partly
done and the controller failover happens.  That case will be handled, though, by the same
logic which makes a new reassignment to the current active assignment a no-op.
>
> best,
> Colin
>
>
> >
> > Thanks,
> > Stanislav
> >
> > > >
> > > > iiiii. znode is updated to move partitionA, new reassignment triggered
via
> > > > API for partitionB, partitionA move finishes
> > > > At this point, do we delete the znode or do we wait until the partitionB
> > > > move finishes as well?
> >
> > > We wait for the partitionB move to finish.  The rationale is that we don't
really ever know what is in ZK (it could change at any time, and our writes to ZK could race
with someone else's writes.)
> >
> > Could you clarify this a bit more, please? We wouldn't care about what
> > is in ZK because we'd be deleting the zNode
> >
> >
> >
> > On Tue, Jul 2, 2019 at 8:04 PM Colin McCabe <cmccabe@apache.org> wrote:
> > >
> > > On Tue, Jul 2, 2019, at 10:47, Stanislav Kozlovski wrote:
> > > > Hey there, I need to start a new thread on KIP-455. I think there might
be
> > > > an issue with the mailing server. For some reason, my replies to the
> > > > previous discussion thread could not be seen by others. After numerous
> > > > attempts, Colin suggested I start a new thread.
> > > >
> > > > Original Discussion Thread:
> > > > https://sematext.com/opensee/m/Kafka/uyzND1Yl7Er128CQu1?subj=+DISCUSS+KIP+455+Create+an+Administrative+API+for+Replica+Reassignment
> > > > KIP:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment
> > > > Last Reply of Previous Thread:
> > > > http://mail-archives.apache.org/mod_mbox/kafka-dev/201906.mbox/%3C679a4c5b-3da6-4556-bb89-e680d8cbb705%40www.fastmail.com%3E
> > >
> > > Thanks, Stan.  Sorry that you had problems getting your email to show up on
the mailing list.  Hopefully infra can figure it out.
> > >
> > > >
> > > > The following is my reply:
> > > > ----
> > > > Hi again,
> > > >
> > > > This has been a great discussion on a tricky KIP. I appreciate everybody's
> > > > involvement in improving this crucial API.
> > > > That being said, I wanted to apologize for my first comment, it was a
bit
> > > > rushed and not thought out.
> > > >
> > > > I've got a few questions now that I dove into this better:
> > > >
> > > > 1. Does it make sense to have an easy way to cancel all ongoing
> > > > reassignments? To cancel all ongoing reassignments, users had the crude
> > > > option of deleting the znode, bouncing the controller and running the
> > > > rollback JSON assignment that kafka-reassign-partitions.sh gave them
> > > > (KAFKA-6304).
> > > > Now that we support multiple reassignment requests, users may add execute
> > > > them incrementally. Suppose something goes horribly wrong and they want
to
> > > > revert as quickly as possible - they would need to run the tool with
> > > > multiple rollback JSONs.  I think that it would be useful to have an easy
> > > > way to stop all ongoing reassignments for emergency situations.
> > >
> > > Users can cancel all ongoing reassignments by providing an empty JSON file
to the reassignments tool.  You're right that the KIP didn't fully spell out the changes to
the reassignment tool, though, so let me add that.
> > >
> > > >
> > > > ---------
> > > >
> > > > 2. Our kafka-reassign-partitions.sh tool doesn't seem to currently let
you
> > > > figure out the ongoing assignments - I guess we expect people to use
> > > > kafka-topics.sh for that. I am not sure how well that would continue to
> > > > work now that we update the replica set only after the new replica joins
> > > > the ISR.
> > > > Do you think it makes sense to add an option for listing the current
> > > > reassignments to the reassign tool as part of this KIP?
> > >
> > > Yes, we definitely want a list command for kafka-reassign-partitions.sh.  I
will add this to the KIP.
> > >
> > > >
> > > > We might want to think whether we want to show the TargetReplicas
> > > > information in the kafka-topics command for completeness as well. That
> > > > might involve the need to update the DescribeTopicsResponse. Personally
I
> > > > can't see a downside but I haven't given it too much thought. I fully
agree
> > > > that we don't want to add the target replicas to the full replica set
and
> > > > nothing useful comes out of telling users they have a replica that might
> > > > not have copied a single byte. Yet, telling them that we have the intention
> > > > of copying bytes sounds useful so maybe having a separate column in
> > > > kafka-topics.sh would provide better clarity?
> > >
> > > Yes, let's add this to kafka-topics.sh.  This can also give us the first use
of Jason's proposed "list reassignments on a subset of all partitions" API, as well.
> > >
> > > >
> > > > ---------
> > > >
> > > > 3. What happens if we do another reassignment to a partition while one
is
> > > > in progress? Do we overwrite the TargetReplicas?
> > > > In the example sequence you gave:
> > > > R: [1, 2, 3, 4, 5, 6], I: [1, 2, 3, 4, 5, 6], T: [4, 5, 6]
> > > > What would the behavior be if a new reassign request came with
> > > > TargetReplicas of [7, 8, 9] for that partition?
> > >
> > > Yes, the new reassignment overwrites the old one.
> > >
> > > >
> > > > To avoid complexity and potential race conditions, would it make sense
to
> > > > reject a reassign request once one is in progress for the specific
> > > > partition, essentially forcing the user to cancel it first?
> > > > Forcing the user to cancel has the benefit of being explicit and guarding
> > > > against human mistakes. The downside I can think of is that in some
> > > > scenarios it might be inefficient, e.g
> > > > R: [1, 2, 3, 4, 5, 6], I: [1, 2, 3, 4, 5, 6], T: [4, 5, 6]
> > > > Cancel request sent out. Followed by a new reassign request with
> > > > TargetReplicas of [5, 6, 7] (note that 5 and 6 already fully copied the
> > > > partition). Becomes a bit of a race condition of whether we deleted the
> > > > partitions in between requests or not - I assume in practice this won't
be
> > > > an issue. I still feel like I prefer the explicit cancellation step
> > >
> > > A lot of users are using more sophisticated tools than kafka-reassign-partitions.sh.
 Those tools will want to incrementally modify ongoing reassignments without cancelling them.
> > >
> > > >
> > > > ---------
> > > >
> > > > 4. My biggest concern - I want to better touch on the interaction between
> > > > the new API and the current admin/reassign_partitions znode, the
> > > > compatibility and our strategy there.
> > > > The KIP says:
> > > >
> > > > > For compatibility purposes, we will continue to allow assignments
to be
> > > > > submitted through the /admin/reassign_partitions node. Just as with
the
> > > > > current code, this will only be possible if there are no current
> > > > > assignments. In other words, the znode has two states: empty and
waiting
> > > > > for a write, and non-empty because there are assignments in progress.
Once
> > > > > the znode is non-empty, further writes to it will be ignored.
> > > >
> > > > Given the current proposal, I can think of 4 scenarios I want to get a
> > > > better understanding of:
> > > >
> > > > *(i, ii, iii, iiii talk about the reassignment of the same one partition
> > > > only - partitionA)*
> > > >
> > > > i. znode is empty, new reassignment triggered via API, znode is updated
> > > > When the new reassignment is triggered via the API, do we create the znode
> > >
> > > No, we do not ever create the znode.
> > >
> > > > or do we allow a separate tool to trigger another reassignment through
it?
> > >
> > > Yes, other tools can modify ZK and trigger a reassignment that way, if the
znode is not present.
> > >
> > > >
> > > > ii. (assuming we allow creating the znode as with scenario "i"): znode
is
> > > > empty, new reassignment triggered via API, znode is updated, znode is
> > > > DELETED
> > > > My understand is that deleting the znode does not do anything until the
> > > > Controller is bounced - is that correct?
> > >
> > > Deleting the znode doesn't do anything with or without bouncing the controller.
> > >
> > > > If so, this means that nothing will happen. If the Controller is bounced,
> > > > the reassignment state will still be live in the [partitionId]/state znode
> > >
> > > Right.
> > >
> > > >
> > > > iii. znode is updated, new reassignment triggered via API
> > > > We override the reassignment for partitionA. The reassign_partitions znode
> > > > is showing stale data, correct?
> > >
> > > Yes.
> > >
> > > >
> > > > iiii. znode is updated, new reassignment triggered via API, controller
> > > > failover
> > > > What does the controller believe - the [partitionId]/state znode or the
> > > > /reassign_partitions ? I would assume the [partitionId]/state znode since
> > > > in this case we want the reassignment API call to be the correct one.
I
> > > > think that opens up the possibility of missing a freshly-set
> > > > /reassign_partitions though (e.g if it was empty and was set right during
> > > > controller failover)
> > >
> > > When starting up, if the znode contains a reassignment, the controller will
try to apply it.  This is a bit messy, but better than the alternatives, I think.
> > >
> > > >
> > > > iiiii. znode is updated to move partitionA, new reassignment triggered
via
> > > > API for partitionB, partitionA move finishes
> > > > At this point, do we delete the znode or do we wait until the partitionB
> > > > move finishes as well?
> > >
> > > We wait for the partitionB move to finish.  The rationale is that we don't
really ever know what is in ZK (it could change at any time, and our writes to ZK could race
with someone else's writes.)
> > >
> > > >
> > > > From the discussion here:
> > > >
> > > > > There's no guarantee that what is in the znode reflects the current
> > > > > reassignments that are going on.  The only thing you can know is
that if
> > > > > the znode exists, there is at least one reassignment going on.
> > > >
> > > > This is changing the expected behavior of a tool that obeys Kafka's current
> > > > behavior though. It is true that updating the znode while a reassignment
is
> > > > in progress has no effect but make ZK misleading but
> > >
> > > We aren't changing the expected behavior.  Updating the znode while a reassignment
is in progress has no effect now, and it will continue to have no effect.
> > >
> > > > tools might have grown
> > > > to follow that rule and only update the znode once it is empty.
> > >
> > > Not really.  Cruise Control, for example, tries to update the znode even when
it's not empty.  This doesn't work, of course.  This is an existing problem with the old API--
which the new API fixes, of course.
> > >
> > > > I think we might want to be more explicit when making such changes - I
had seen
> > > > discontentment in the community from the fact that we had changed the
znode
> > > > updating behavior in a MINOR pull request.
> > > >
> > > > I feel it is complex to support both APIs and make sure we don't have
> > > > unhandled edge cases. I liked Bob's suggestion on potentially allowing
only
> > > > one via a feature flag:
> > > >
> > > > > Could we temporarily support
> > > > > both, with a config enabling the new behavior to prevent users from
trying
> > > > > to use both mechanisms (if the config is true, the old znode is ignored;
if
> > > > > the config is false, the Admin Client API returns an error indicating
that
> > > > > it is not enabled)?
> > > >
> > > > Perhaps it makes sense to discuss that possibility a bit more?
> > >
> > > I think you are misinterpreting Bob's suggestion here.  My original proposal
was to disable the old API entirely, and he was arguing for having a period where we support
both.  I agreed with him and this is what the KIP now implements -- support for both.
> > >
> > > Bob thought we might need a flag to switch between them, but this is not the
case.  Such a flag would add a huge amount of complexity since we'd have to have totally separate
code paths, totally separate ways of storing the metadata, etc. etc.
> > >
> > > As you know, the old API was not very well specified.  There are a lot of race
conditions and problems with it that are unfixable-- hence the need for a new API in the first
place.  The current proposal tries as hard as possible to be bug-for-bug compatible with the
old API.  If you have a suggestion for improving the compatibility, let me know.  But a flag
for disabling the new API would not be helpful here-- it would just make life harder for tools
that have to operate against multiple different versions of Kafka.
> > >
> > > >
> > > > ---------
> > > >
> > > > 5. ListPartitionReassignments filtering
> > > >
> > > > I guess the thought process here is that most reassignment tools want
to
> > > > > know about all the reassignments that are going on.  If you don't
know all
> > > > > the pending reassignments, then it's hard to say whether adding a
new one
> > > > > is a good idea, or cancelling an existing one.  So I guess I can't
think of
> > > > > a case where a reassignment tool would want a partial set rather
than the
> > > > > full one.
> > > >
> > > >
> > > > I agree with Jason about the UIs having "drill into" options. I believe
we
> > > > should support some sort of ongoing reassignment filtering at the topic
> > > > level (that's the administrative concept people care about).
> > > > An example of a tool that might leverage it is our own
> > > > kafka-reassign-partitions.sh. You can ask that tool to generate a
> > > > reassignment for you from a given list of topics. It currently uses
> > > > `KafkaZkClient#getReplicaAssignmentForTopics()` to get the current
> > > > assignment for the given topics. It would be better if it could use the
new
> > > > ListPartitionsReassignments API to both figure out the current replica
> > > > assignments and whether or not those topics are being reassigned (it could
> > > > log a warning that a reassignment is in progress for those topics).
> > >
> > > Yes, we should support the ability to list reassignments only for specific
partitions.  I will add this to the KIP.  It will look at lot like how MetadataRequest works.
> > >
> > > >
> > > > ---------
> > > >
> > > > and a small nit: We also need to update
> > > > the ListPartitionReassignmentsResponse with the decided
> > > > current/targetReplicas naming
> > >
> > > OK
> > >
> > > Colin
> > >
> > > >
> > > > Thanks,
> > > > Stanislav
> > > >
> >
> >
> >
> > --
> > Best,
> > Stanislav
> >



-- 
Best,
Stanislav

Mime
View raw message