kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dong Lin <lindon...@gmail.com>
Subject Re: [DISCUSS] KIP-112: Handle disk failure for JBOD
Date Tue, 14 Feb 2017 15:30:07 GMT
Hey Jun,

I just realized that you may be suggesting that a tool for listing offline
directories is necessary for KIP-112 by asking whether KIP-112 and KIP-113
will be in the same release. I think such a tool is useful but doesn't have
to be included in KIP-112. This is because as of now admin needs to log
into broker machine and check broker log to figure out the cause of broker
failure and the bad log directory in case of disk failure. The KIP-112
won't make it harder since admin can still figure out the bad log directory
by doing the same thing. Thus it is probably OK to just include this script
in KIP-113. Regardless, my hope is to finish both KIPs ASAP and make them
in the same release since both KIPs are needed for the JBOD setup.

Thanks,
Dong

On Mon, Feb 13, 2017 at 5:52 PM, Dong Lin <lindong28@gmail.com> wrote:

> And the test plan has also been updated to simulate disk failure by
> changing log directory permission to 000.
>
> On Mon, Feb 13, 2017 at 5:50 PM, Dong Lin <lindong28@gmail.com> wrote:
>
>> Hi Jun,
>>
>> Thanks for the reply. These comments are very helpful. Let me answer them
>> inline.
>>
>>
>> On Mon, Feb 13, 2017 at 3:25 PM, Jun Rao <jun@confluent.io> wrote:
>>
>>> Hi, Dong,
>>>
>>> Thanks for the reply. A few more replies and new comments below.
>>>
>>> On Fri, Feb 10, 2017 at 4:27 PM, Dong Lin <lindong28@gmail.com> wrote:
>>>
>>> > Hi Jun,
>>> >
>>> > Thanks for the detailed comments. Please see answers inline:
>>> >
>>> > On Fri, Feb 10, 2017 at 3:08 PM, Jun Rao <jun@confluent.io> wrote:
>>> >
>>> > > Hi, Dong,
>>> > >
>>> > > Thanks for the updated wiki. A few comments below.
>>> > >
>>> > > 1. Topics get created
>>> > > 1.1 Instead of storing successfully created replicas in ZK, could we
>>> > store
>>> > > unsuccessfully created replicas in ZK? Since the latter is less
>>> common,
>>> > it
>>> > > probably reduces the load on ZK.
>>> > >
>>> >
>>> > We can store unsuccessfully created replicas in ZK. But I am not sure
>>> if
>>> > that can reduce write load on ZK.
>>> >
>>> > If we want to reduce write load on ZK using by store unsuccessfully
>>> created
>>> > replicas in ZK, then broker should not write to ZK if all replicas are
>>> > successfully created. It means that if /broker/topics/[topic]/partiti
>>> > ons/[partitionId]/controller_managed_state doesn't exist in ZK for a
>>> given
>>> > partition, we have to assume all replicas of this partition have been
>>> > successfully created and send LeaderAndIsrRequest with create = false.
>>> This
>>> > becomes a problem if controller crashes before receiving
>>> > LeaderAndIsrResponse to validate whether a replica has been created.
>>> >
>>> > I think this approach and reduce the number of bytes stored in ZK. But
>>> I am
>>> > not sure if this is a concern.
>>> >
>>> >
>>> >
>>> I was mostly concerned about the controller failover time. Currently, the
>>> controller failover is likely dominated by the cost of reading
>>> topic/partition level information from ZK. If we add another partition
>>> level path in ZK, it probably will double the controller failover time.
>>> If
>>> the approach of representing the non-created replicas doesn't work, have
>>> you considered just adding the created flag in the leaderAndIsr path in
>>> ZK?
>>>
>>>
>> Yes, I have considered adding the created flag in the leaderAndIsr path
>> in ZK. If we were to add created flag per replica in the
>> LeaderAndIsrRequest, then it requires a lot of change in the code base.
>>
>> If we don't add created flag per replica in the LeaderAndIsrRequest, then
>> the information in leaderAndIsr path in ZK and LeaderAndIsrRequest would be
>> different. Further, the procedure for broker to update ISR in ZK will be a
>> bit complicated. When leader updates leaderAndIsr path in ZK, it will have
>> to first read created flags from ZK, change isr, and write leaderAndIsr
>> back to ZK. And it needs to check znode version and re-try write operation
>> in ZK if controller has updated ZK during this period. This is in contrast
>> to the current implementation where the leader either gets all the
>> information from LeaderAndIsrRequest sent by controller, or determine the
>> infromation by itself (e.g. ISR), before writing to leaderAndIsr path in ZK.
>>
>> It seems to me that the above solution is a bit complicated and not
>> clean. Thus I come up with the design in this KIP to store this created
>> flag in a separate zk path. The path is named controller_managed_state to
>> indicate that we can store in this znode all information that is managed by
>> controller only, as opposed to ISR.
>>
>> I agree with your concern of increased ZK read time during controller
>> failover. How about we store the "created" information in the
>> znode /brokers/topics/[topic]? We can change that znode to have the
>> following data format:
>>
>> {
>>   "version" : 2,
>>   "created" : {
>>     "1" : [1, 2, 3],
>>     ...
>>   }
>>   "partition" : {
>>     "1" : [1, 2, 3],
>>     ...
>>   }
>> }
>>
>> We won't have extra zk read using this solution. It also seems reasonable
>> to put the partition assignment information together with replica creation
>> information. The latter is only changed once after the partition is created
>> or re-assigned.
>>
>>
>>>
>>>
>>> >
>>> > > 1.2 If an error is received for a follower, does the controller
>>> eagerly
>>> > > remove it from ISR or do we just let the leader removes it after
>>> timeout?
>>> > >
>>> >
>>> > No, Controller will not actively remove it from ISR. But controller
>>> will
>>> > recognize it as offline replica and propagate this information to all
>>> > brokers via UpdateMetadataRequest. Each leader can use this
>>> information to
>>> > actively remove offline replica from ISR set. I have updated to wiki to
>>> > clarify it.
>>> >
>>> >
>>>
>>> That seems inconsistent with how the controller deals with offline
>>> replicas
>>> due to broker failures. When that happens, the broker will (1) select a
>>> new
>>> leader if the offline replica is the leader; (2) remove the replica from
>>> ISR if the offline replica is the follower. So, intuitively, it seems
>>> that
>>> we should be doing the same thing when dealing with offline replicas due
>>> to
>>> disk failure.
>>>
>>
>> My bad. I misunderstand how the controller currently handles broker
>> failure and ISR change. Yes we should do the same thing when dealing with
>> offline replicas here. I have updated the KIP to specify that, when an
>> offline replica is discovered by controller, the controller removes offline
>> replicas from ISR in the ZK and sends LeaderAndIsrRequest with updated ISR
>> to be used by partition leaders.
>>
>>
>>>
>>>
>>>
>>> >
>>> > > 1.3 Similar, if an error is received for a leader, should the
>>> controller
>>> > > trigger leader election again?
>>> > >
>>> >
>>> > Yes, controller will trigger leader election if leader replica is
>>> offline.
>>> > I have updated the wiki to clarify it.
>>> >
>>> >
>>> > >
>>> > > 2. A log directory stops working on a broker during runtime:
>>> > > 2.1 It seems the broker remembers the failed directory after hitting
>>> an
>>> > > IOException and the failed directory won't be used for creating new
>>> > > partitions until the broker is restarted? If so, could you add that
>>> to
>>> > the
>>> > > wiki.
>>> > >
>>> >
>>> > Right, broker assumes a log directory to be good after it starts, and
>>> mark
>>> > log directory as bad once there is IOException when broker attempts to
>>> > access the log directory. New replicas will only be created on good log
>>> > directory. I just added this to the KIP.
>>> >
>>> >
>>> > > 2.2 Could you be a bit more specific on how and during which
>>> operation
>>> > the
>>> > > broker detects directory failure? Is it when the broker hits an
>>> > IOException
>>> > > during writes, or both reads and writes?  For example, during broker
>>> > > startup, it only reads from each of the log directories, if it hits
>>> an
>>> > > IOException there, does the broker immediately mark the directory as
>>> > > offline?
>>> > >
>>> >
>>> > Broker marks log directory as bad once there is IOException when broker
>>> > attempts to access the log directory. This includes read and write.
>>> These
>>> > operations include log append, log read, log cleaning, watermark
>>> checkpoint
>>> > etc. If broker hits IOException when it reads from each of the log
>>> > directory during startup, it immediately mark the directory as offline.
>>> >
>>> > I just updated the KIP to clarify it.
>>> >
>>> >
>>> > > 3. Partition reassignment: If we know a replica is offline, do we
>>> still
>>> > > want to send StopReplicaRequest to it?
>>> > >
>>> >
>>> > No, controller doesn't send StopReplicaRequest for an offline replica.
>>> > Controller treats this scenario in the same way that exiting Kafka
>>> > implementation does when the broker of this replica is offline.
>>> >
>>> >
>>> > >
>>> > > 4. UpdateMetadataRequestPartitionState: For offline_replicas, do
>>> they
>>> > only
>>> > > include offline replicas due to log directory failures or do they
>>> also
>>> > > include offline replicas due to broker failure?
>>> > >
>>> >
>>> > UpdateMetadataRequestPartitionState's offline_replicas include offline
>>> > replicas due to both log directory failure and broker failure. This is
>>> to
>>> > make the semantics of this field easier to understand. Broker can
>>> > distinguish whether a replica is offline due to broker failure or disk
>>> > failure by checking whether a broker is live in the
>>> UpdateMetadataRequest.
>>> >
>>> >
>>> > >
>>> > > 5. Tools: Could we add some kind of support in the tool to list
>>> offline
>>> > > directories?
>>> > >
>>> >
>>> > In KIP-112 we don't have tools to list offline directories because we
>>> have
>>> > intentionally avoided exposing log directory information (e.g. log
>>> > directory path) to user or other brokers. I think we can add this
>>> feature
>>> > in KIP-113, in which we will have DescribeDirsRequest to list log
>>> directory
>>> > information (e.g. partition assignment, path, size) needed for
>>> rebalance.
>>> >
>>> >
>>> Since we are introducing a new failure mode, if a replica becomes offline
>>> due to failure in log directories, the first thing an admin wants to know
>>> is which log directories are offline from the broker's perspective.  So,
>>> including such a tool will be useful. Do you plan to do KIP-112 and
>>> KIP-113
>>>  in the same release?
>>>
>>>
>> Yes, I agree that including such a tool is using. This is probably better
>> to be added in KIP-113 because we need DescribeDirsRequest to get this
>> information. I will update KIP-113 to include this tool.
>>
>> I plan to do KIP-112 and KIP-113 separately to make each KIP and their
>> patch easier to review. I don't have any plan about which release to have
>> these KIPs. My plan is to both of them ASAP. Is there particular timeline
>> you prefer for code of these two KIPs to checked-in?
>>
>>
>>> >
>>> > >
>>> > > 6. Metrics: Could we add some metrics to show offline directories?
>>> > >
>>> >
>>> > Sure. I think it makes sense to have each broker report its number of
>>> > offline replicas and offline log directories. The previous metric was
>>> put
>>> > in KIP-113. I just added both metrics in KIP-112.
>>> >
>>> >
>>> > >
>>> > > 7. There are still references to kafka-log-dirs.sh. Are they valid?
>>> > >
>>> >
>>> > My bad. I just removed this from "Changes in Operational Procedures"
>>> and
>>> > "Test Plan" in the KIP.
>>> >
>>> >
>>> > >
>>> > > 8. Do you think KIP-113 is ready for review? One thing that KIP-113
>>> > > mentions during partition reassignment is to first send
>>> > > LeaderAndIsrRequest, followed by ChangeReplicaDirRequest. It seems
>>> it's
>>> > > better if the replicas are created in the right log directory in the
>>> > first
>>> > > place? The reason that I brought it up here is because it may affect
>>> the
>>> > > protocol of LeaderAndIsrRequest.
>>> > >
>>> >
>>> > Yes, KIP-113 is ready for review. The advantage of the current design
>>> is
>>> > that we can keep LeaderAndIsrRequest log-direcotry-agnostic. The
>>> > implementation would be much easier to read if all log related logic
>>> (e.g.
>>> > various errors) are put in ChangeReplicadIRrequest and the code path of
>>> > handling replica movement is separated from leadership handling.
>>> >
>>> > In other words, I think Kafka may be easier to develop in the long
>>> term if
>>> > we separate these two requests.
>>> >
>>> > I agree that ideally we want to create replicas in the right log
>>> directory
>>> > in the first place. But I am not sure if there is any performance or
>>> > correctness concern with the existing way of moving it after it is
>>> created.
>>> > Besides, does this decision affect the change proposed in KIP-112?
>>> >
>>> >
>>> I am just wondering if you have considered including the log directory
>>> for
>>> the replicas in the LeaderAndIsrRequest.
>>>
>>>
>> Yeah I have thought about this idea, but only briefly. I rejected this
>> idea because log directory is broker's local information and I prefer not
>> to expose local config information to the cluster through
>> LeaderAndIsrRequest.
>>
>>
>>> 9. Could you describe when the offline replicas due to log directory
>>> failure are removed from the replica fetch threads?
>>>
>>
>> Yes. If the offline replica was a leader, either a new leader is elected
>> or all follower brokers will stop fetching for this partition. If the
>> offline replica is a follower, the broker will stop fetching for this
>> replica immediately. A broker stops fetching data for a replica by removing
>> the replica from the replica fetch threads. I have updated the KIP to
>> clarify it.
>>
>>
>>>
>>> 10. The wiki mentioned changing the log directory to a file for
>>> simulating
>>> disk failure in system tests. Could we just change the permission of the
>>> log directory to 000 to simulate that?
>>>
>>
>>
>> Sure,
>>
>>
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>>
>>> > > Jun
>>> > >
>>> > > On Fri, Feb 10, 2017 at 9:53 AM, Dong Lin <lindong28@gmail.com>
>>> wrote:
>>> > >
>>> > > > Hi Jun,
>>> > > >
>>> > > > Can I replace zookeeper access with direct RPC for both ISR
>>> > notification
>>> > > > and disk failure notification in a future KIP, or do you feel we
>>> should
>>> > > do
>>> > > > it in this KIP?
>>> > > >
>>> > > > Hi Eno, Grant and everyone,
>>> > > >
>>> > > > Is there further improvement you would like to see with this KIP?
>>> > > >
>>> > > > Thanks you all for the comments,
>>> > > >
>>> > > > Dong
>>> > > >
>>> > > >
>>> > > >
>>> > > > On Thu, Feb 9, 2017 at 4:45 PM, Dong Lin <lindong28@gmail.com>
>>> wrote:
>>> > > >
>>> > > > >
>>> > > > >
>>> > > > > On Thu, Feb 9, 2017 at 3:37 PM, Colin McCabe <cmccabe@apache.org
>>> >
>>> > > wrote:
>>> > > > >
>>> > > > >> On Thu, Feb 9, 2017, at 11:40, Dong Lin wrote:
>>> > > > >> > Thanks for all the comments Colin!
>>> > > > >> >
>>> > > > >> > To answer your questions:
>>> > > > >> > - Yes, a broker will shutdown if all its log directories are
>>> bad.
>>> > > > >>
>>> > > > >> That makes sense.  Can you add this to the writeup?
>>> > > > >>
>>> > > > >
>>> > > > > Sure. This has already been added. You can find it here
>>> > > > > <https://cwiki.apache.org/confluence/pages/diffpagesbyversio
>>> n.action
>>> > ?
>>> > > > pageId=67638402&selectedPageVersions=9&selectedPageVersions=10>
>>> > > > > .
>>> > > > >
>>> > > > >
>>> > > > >>
>>> > > > >> > - I updated the KIP to explicitly state that a log directory
>>> will
>>> > be
>>> > > > >> > assumed to be good until broker sees IOException when it
>>> tries to
>>> > > > access
>>> > > > >> > the log directory.
>>> > > > >>
>>> > > > >> Thanks.
>>> > > > >>
>>> > > > >> > - Controller doesn't explicitly know whether there is new log
>>> > > > directory
>>> > > > >> > or
>>> > > > >> > not. All controller knows is whether replicas are online or
>>> > offline
>>> > > > >> based
>>> > > > >> > on LeaderAndIsrResponse. According to the existing Kafka
>>> > > > implementation,
>>> > > > >> > controller will always send LeaderAndIsrRequest to a broker
>>> after
>>> > it
>>> > > > >> > bounces.
>>> > > > >>
>>> > > > >> I thought so.  It's good to clarify, though.  Do you think it's
>>> > worth
>>> > > > >> adding a quick discussion of this on the wiki?
>>> > > > >>
>>> > > > >
>>> > > > > Personally I don't think it is needed. If broker starts with no
>>> bad
>>> > log
>>> > > > > directory, everything should work it is and we should not need to
>>> > > clarify
>>> > > > > it. The KIP has already covered the scenario when a broker starts
>>> > with
>>> > > > bad
>>> > > > > log directory. Also, the KIP doesn't claim or hint that we
>>> support
>>> > > > dynamic
>>> > > > > addition of new log directories. I think we are good.
>>> > > > >
>>> > > > >
>>> > > > >> best,
>>> > > > >> Colin
>>> > > > >>
>>> > > > >> >
>>> > > > >> > Please see this
>>> > > > >> > <https://cwiki.apache.org/confluence/pages/diffpagesbyversio
>>> > > > >> n.action?pageId=67638402&selectedPageVersions=9&
>>> > > > selectedPageVersions=10>
>>> > > > >> > for the change of the KIP.
>>> > > > >> >
>>> > > > >> > On Thu, Feb 9, 2017 at 11:04 AM, Colin McCabe <
>>> cmccabe@apache.org
>>> > >
>>> > > > >> wrote:
>>> > > > >> >
>>> > > > >> > > On Thu, Feb 9, 2017, at 11:03, Colin McCabe wrote:
>>> > > > >> > > > Thanks, Dong L.
>>> > > > >> > > >
>>> > > > >> > > > Do we plan on bringing down the broker process when all
>>> log
>>> > > > >> directories
>>> > > > >> > > > are offline?
>>> > > > >> > > >
>>> > > > >> > > > Can you explicitly state on the KIP that the log dirs are
>>> all
>>> > > > >> considered
>>> > > > >> > > > good after the broker process is bounced?  It seems like
>>> an
>>> > > > >> important
>>> > > > >> > > > thing to be clear about.  Also, perhaps discuss how the
>>> > > controller
>>> > > > >> > > > becomes aware of the newly good log directories after a
>>> broker
>>> > > > >> bounce
>>> > > > >> > > > (and whether this triggers re-election).
>>> > > > >> > >
>>> > > > >> > > I meant to write, all the log dirs where the broker can
>>> still
>>> > read
>>> > > > the
>>> > > > >> > > index and some other files.  Clearly, log dirs that are
>>> > completely
>>> > > > >> > > inaccessible will still be considered bad after a broker
>>> process
>>> > > > >> bounce.
>>> > > > >> > >
>>> > > > >> > > best,
>>> > > > >> > > Colin
>>> > > > >> > >
>>> > > > >> > > >
>>> > > > >> > > > +1 (non-binding) aside from that
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > > On Wed, Feb 8, 2017, at 00:47, Dong Lin wrote:
>>> > > > >> > > > > Hi all,
>>> > > > >> > > > >
>>> > > > >> > > > > Thank you all for the helpful suggestion. I have
>>> updated the
>>> > > KIP
>>> > > > >> to
>>> > > > >> > > > > address
>>> > > > >> > > > > the comments received so far. See here
>>> > > > >> > > > > <https://cwiki.apache.org/conf
>>> > luence/pages/diffpagesbyversio
>>> > > > >> n.action?
>>> > > > >> > > pageId=67638402&selectedPageVe
>>> rsions=8&selectedPageVersions=
>>> > 9>to
>>> > > > >> > > > > read the changes of the KIP. Here is a summary of
>>> change:
>>> > > > >> > > > >
>>> > > > >> > > > > - Updated the Proposed Change section to change the
>>> recovery
>>> > > > >> steps.
>>> > > > >> > > After
>>> > > > >> > > > > this change, broker will also create replica as long as
>>> all
>>> > > log
>>> > > > >> > > > > directories
>>> > > > >> > > > > are working.
>>> > > > >> > > > > - Removed kafka-log-dirs.sh from this KIP since user no
>>> > longer
>>> > > > >> needs to
>>> > > > >> > > > > use
>>> > > > >> > > > > it for recovery from bad disks.
>>> > > > >> > > > > - Explained how the znode controller_managed_state is
>>> > managed
>>> > > in
>>> > > > >> the
>>> > > > >> > > > > Public
>>> > > > >> > > > > interface section.
>>> > > > >> > > > > - Explained what happens during controller failover,
>>> > partition
>>> > > > >> > > > > reassignment
>>> > > > >> > > > > and topic deletion in the Proposed Change section.
>>> > > > >> > > > > - Updated Future Work section to include the following
>>> > > potential
>>> > > > >> > > > > improvements
>>> > > > >> > > > >   - Let broker notify controller of ISR change and disk
>>> > state
>>> > > > >> change
>>> > > > >> > > via
>>> > > > >> > > > > RPC instead of using zookeeper
>>> > > > >> > > > >   - Handle various failure scenarios (e.g. slow disk)
>>> on a
>>> > > > >> case-by-case
>>> > > > >> > > > > basis. For example, we may want to detect slow disk and
>>> > > consider
>>> > > > >> it as
>>> > > > >> > > > > offline.
>>> > > > >> > > > >   - Allow admin to mark a directory as bad so that it
>>> will
>>> > not
>>> > > > be
>>> > > > >> used.
>>> > > > >> > > > >
>>> > > > >> > > > > Thanks,
>>> > > > >> > > > > Dong
>>> > > > >> > > > >
>>> > > > >> > > > >
>>> > > > >> > > > >
>>> > > > >> > > > > On Tue, Feb 7, 2017 at 5:23 PM, Dong Lin <
>>> > lindong28@gmail.com
>>> > > >
>>> > > > >> wrote:
>>> > > > >> > > > >
>>> > > > >> > > > > > Hey Eno,
>>> > > > >> > > > > >
>>> > > > >> > > > > > Thanks much for the comment!
>>> > > > >> > > > > >
>>> > > > >> > > > > > I still think the complexity added to Kafka is
>>> justified
>>> > by
>>> > > > its
>>> > > > >> > > benefit.
>>> > > > >> > > > > > Let me provide my reasons below.
>>> > > > >> > > > > >
>>> > > > >> > > > > > 1) The additional logic is easy to understand and
>>> thus its
>>> > > > >> complexity
>>> > > > >> > > > > > should be reasonable.
>>> > > > >> > > > > >
>>> > > > >> > > > > > On the broker side, it needs to catch exception when
>>> > access
>>> > > > log
>>> > > > >> > > directory,
>>> > > > >> > > > > > mark log directory and all its replicas as offline,
>>> notify
>>> > > > >> > > controller by
>>> > > > >> > > > > > writing the zookeeper notification path, and specify
>>> error
>>> > > in
>>> > > > >> > > > > > LeaderAndIsrResponse. On the controller side, it will
>>> > > listener
>>> > > > >> to
>>> > > > >> > > > > > zookeeper for disk failure notification, learn about
>>> > offline
>>> > > > >> > > replicas in
>>> > > > >> > > > > > the LeaderAndIsrResponse, and take offline replicas
>>> into
>>> > > > >> > > consideration when
>>> > > > >> > > > > > electing leaders. It also mark replica as created in
>>> > > zookeeper
>>> > > > >> and
>>> > > > >> > > use it
>>> > > > >> > > > > > to determine whether a replica is created.
>>> > > > >> > > > > >
>>> > > > >> > > > > > That is all the logic we need to add in Kafka. I
>>> > personally
>>> > > > feel
>>> > > > >> > > this is
>>> > > > >> > > > > > easy to reason about.
>>> > > > >> > > > > >
>>> > > > >> > > > > > 2) The additional code is not much.
>>> > > > >> > > > > >
>>> > > > >> > > > > > I expect the code for KIP-112 to be around 1100 lines
>>> new
>>> > > > code.
>>> > > > >> > > Previously
>>> > > > >> > > > > > I have implemented a prototype of a slightly different
>>> > > design
>>> > > > >> (see
>>> > > > >> > > here
>>> > > > >> > > > > > <https://docs.google.com/docum
>>> ent/d/1Izza0SBmZMVUBUt9s_
>>> > > > >> > > -Dqi3D8e0KGJQYW8xgEdRsgAI/edit>)
>>> > > > >> > > > > > and uploaded it to github (see here
>>> > > > >> > > > > > <https://github.com/lindong28/kafka/tree/JBOD>). The
>>> > patch
>>> > > > >> changed
>>> > > > >> > > 33
>>> > > > >> > > > > > files, added 1185 lines and deleted 183 lines. The
>>> size of
>>> > > > >> prototype
>>> > > > >> > > patch
>>> > > > >> > > > > > is actually smaller than patch of KIP-107 (see here
>>> > > > >> > > > > > <https://github.com/apache/kafka/pull/2476>) which is
>>> > > already
>>> > > > >> > > accepted.
>>> > > > >> > > > > > The KIP-107 patch changed 49 files, added 1349 lines
>>> and
>>> > > > >> deleted 141
>>> > > > >> > > lines.
>>> > > > >> > > > > >
>>> > > > >> > > > > > 3) Comparison with one-broker-per-multiple-volumes
>>> > > > >> > > > > >
>>> > > > >> > > > > > This KIP can improve the availability of Kafka in this
>>> > case
>>> > > > such
>>> > > > >> > > that one
>>> > > > >> > > > > > failed volume doesn't bring down the entire broker.
>>> > > > >> > > > > >
>>> > > > >> > > > > > 4) Comparison with one-broker-per-volume
>>> > > > >> > > > > >
>>> > > > >> > > > > > If each volume maps to multiple disks, then we still
>>> have
>>> > > > >> similar
>>> > > > >> > > problem
>>> > > > >> > > > > > such that the broker will fail if any disk of the
>>> volume
>>> > > > failed.
>>> > > > >> > > > > >
>>> > > > >> > > > > > If each volume maps to one disk, it means that we
>>> need to
>>> > > > >> deploy 10
>>> > > > >> > > > > > brokers on a machine if the machine has 10 disks. I
>>> will
>>> > > > >> explain the
>>> > > > >> > > > > > concern with this approach in order of their
>>> importance.
>>> > > > >> > > > > >
>>> > > > >> > > > > > - It is weird if we were to tell kafka user to deploy
>>> 50
>>> > > > >> brokers on a
>>> > > > >> > > > > > machine of 50 disks.
>>> > > > >> > > > > >
>>> > > > >> > > > > > - Either when user deploys Kafka on a commercial cloud
>>> > > > platform
>>> > > > >> or
>>> > > > >> > > when
>>> > > > >> > > > > > user deploys their own cluster, the size or largest
>>> disk
>>> > is
>>> > > > >> usually
>>> > > > >> > > > > > limited. There will be scenarios where user want to
>>> > increase
>>> > > > >> broker
>>> > > > >> > > > > > capacity by having multiple disks per broker. This
>>> JBOD
>>> > KIP
>>> > > > >> makes it
>>> > > > >> > > > > > feasible without hurting availability due to single
>>> disk
>>> > > > >> failure.
>>> > > > >> > > > > >
>>> > > > >> > > > > > - Automatic load rebalance across disks will be
>>> easier and
>>> > > > more
>>> > > > >> > > flexible
>>> > > > >> > > > > > if one broker has multiple disks. This can be future
>>> work.
>>> > > > >> > > > > >
>>> > > > >> > > > > > - There is performance concern when you deploy 10
>>> broker
>>> > > vs. 1
>>> > > > >> > > broker on
>>> > > > >> > > > > > one machine. The metadata the cluster, including
>>> > > FetchRequest,
>>> > > > >> > > > > > ProduceResponse, MetadataRequest and so on will all
>>> be 10X
>>> > > > >> more. The
>>> > > > >> > > > > > packet-per-second will be 10X higher which may limit
>>> > > > >> performance if
>>> > > > >> > > pps is
>>> > > > >> > > > > > the performance bottleneck. The number of socket on
>>> the
>>> > > > machine
>>> > > > >> is
>>> > > > >> > > 10X
>>> > > > >> > > > > > higher. And the number of replication thread will be
>>> 100X
>>> > > > more.
>>> > > > >> The
>>> > > > >> > > impact
>>> > > > >> > > > > > will be more significant with increasing number of
>>> disks
>>> > per
>>> > > > >> > > machine. Thus
>>> > > > >> > > > > > it will limit Kakfa's scalability in the long term.
>>> > > > >> > > > > >
>>> > > > >> > > > > > Thanks,
>>> > > > >> > > > > > Dong
>>> > > > >> > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > > > On Tue, Feb 7, 2017 at 1:51 AM, Eno Thereska <
>>> > > > >> eno.thereska@gmail.com
>>> > > > >> > > >
>>> > > > >> > > > > > wrote:
>>> > > > >> > > > > >
>>> > > > >> > > > > >> Hi Dong,
>>> > > > >> > > > > >>
>>> > > > >> > > > > >> To simplify the discussion today, on my part I'll
>>> zoom
>>> > into
>>> > > > one
>>> > > > >> > > thing
>>> > > > >> > > > > >> only:
>>> > > > >> > > > > >>
>>> > > > >> > > > > >> - I'll discuss the options called below :
>>> > > > >> "one-broker-per-disk" or
>>> > > > >> > > > > >> "one-broker-per-few-disks".
>>> > > > >> > > > > >>
>>> > > > >> > > > > >> - I completely buy the JBOD vs RAID arguments so
>>> there is
>>> > > no
>>> > > > >> need to
>>> > > > >> > > > > >> discuss that part for me. I buy it that JBODs are
>>> good.
>>> > > > >> > > > > >>
>>> > > > >> > > > > >> I find the terminology can be improved a bit. Ideally
>>> > we'd
>>> > > be
>>> > > > >> > > talking
>>> > > > >> > > > > >> about volumes, not disks. Just to make it clear that
>>> > Kafka
>>> > > > >> > > understand
>>> > > > >> > > > > >> volumes/directories, not individual raw disks. So by
>>> > > > >> > > > > >> "one-broker-per-few-disks" what I mean is that the
>>> admin
>>> > > can
>>> > > > >> pool a
>>> > > > >> > > few
>>> > > > >> > > > > >> disks together to create a volume/directory and give
>>> that
>>> > > to
>>> > > > >> Kafka.
>>> > > > >> > > > > >>
>>> > > > >> > > > > >>
>>> > > > >> > > > > >> The kernel of my question will be that the admin
>>> already
>>> > > has
>>> > > > >> tools
>>> > > > >> > > to 1)
>>> > > > >> > > > > >> create volumes/directories from a JBOD and 2) start a
>>> > > broker
>>> > > > >> on a
>>> > > > >> > > desired
>>> > > > >> > > > > >> machine and 3) assign a broker resources like a
>>> > directory.
>>> > > I
>>> > > > >> claim
>>> > > > >> > > that
>>> > > > >> > > > > >> those tools are sufficient to optimise resource
>>> > allocation.
>>> > > > I
>>> > > > >> > > understand
>>> > > > >> > > > > >> that a broker could manage point 3) itself, ie
>>> juggle the
>>> > > > >> > > directories. My
>>> > > > >> > > > > >> question is whether the complexity added to Kafka is
>>> > > > justified.
>>> > > > >> > > > > >> Operationally it seems to me an admin will still
>>> have to
>>> > do
>>> > > > >> all the
>>> > > > >> > > three
>>> > > > >> > > > > >> items above.
>>> > > > >> > > > > >>
>>> > > > >> > > > > >> Looking forward to the discussion
>>> > > > >> > > > > >> Thanks
>>> > > > >> > > > > >> Eno
>>> > > > >> > > > > >>
>>> > > > >> > > > > >>
>>> > > > >> > > > > >> > On 1 Feb 2017, at 17:21, Dong Lin <
>>> lindong28@gmail.com
>>> > >
>>> > > > >> wrote:
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > Hey Eno,
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > Thanks much for the review.
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > I think your suggestion is to split disks of a
>>> machine
>>> > > into
>>> > > > >> > > multiple
>>> > > > >> > > > > >> disk
>>> > > > >> > > > > >> > sets and run one broker per disk set. Yeah this is
>>> > > similar
>>> > > > to
>>> > > > >> > > Colin's
>>> > > > >> > > > > >> > suggestion of one-broker-per-disk, which we have
>>> > > evaluated
>>> > > > at
>>> > > > >> > > LinkedIn
>>> > > > >> > > > > >> and
>>> > > > >> > > > > >> > considered it to be a good short term approach.
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > As of now I don't think any of these approach is a
>>> > better
>>> > > > >> > > alternative in
>>> > > > >> > > > > >> > the long term. I will summarize these here. I have
>>> put
>>> > > > these
>>> > > > >> > > reasons in
>>> > > > >> > > > > >> the
>>> > > > >> > > > > >> > KIP's motivation section and rejected alternative
>>> > > section.
>>> > > > I
>>> > > > >> am
>>> > > > >> > > happy to
>>> > > > >> > > > > >> > discuss more and I would certainly like to use an
>>> > > > alternative
>>> > > > >> > > solution
>>> > > > >> > > > > >> that
>>> > > > >> > > > > >> > is easier to do with better performance.
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > - JBOD vs. RAID-10: if we switch from RAID-10 with
>>> > > > >> > > > > >> replication-factoer=2 to
>>> > > > >> > > > > >> > JBOD with replicatio-factor=3, we get 25%
>>> reduction in
>>> > > disk
>>> > > > >> usage
>>> > > > >> > > and
>>> > > > >> > > > > >> > doubles the tolerance of broker failure before data
>>> > > > >> > > unavailability from
>>> > > > >> > > > > >> 1
>>> > > > >> > > > > >> > to 2. This is pretty huge gain for any company that
>>> > uses
>>> > > > >> Kafka at
>>> > > > >> > > large
>>> > > > >> > > > > >> > scale.
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > - JBOD vs. one-broker-per-disk: The benefit of
>>> > > > >> > > one-broker-per-disk is
>>> > > > >> > > > > >> that
>>> > > > >> > > > > >> > no major code change is needed in Kafka. Among the
>>> > > > >> disadvantage of
>>> > > > >> > > > > >> > one-broker-per-disk summarized in the KIP and
>>> previous
>>> > > > email
>>> > > > >> with
>>> > > > >> > > Colin,
>>> > > > >> > > > > >> > the biggest one is the 15% throughput loss
>>> compared to
>>> > > JBOD
>>> > > > >> and
>>> > > > >> > > less
>>> > > > >> > > > > >> > flexibility to balance across disks. Further, it
>>> > probably
>>> > > > >> requires
>>> > > > >> > > > > >> change
>>> > > > >> > > > > >> > to internal deployment tools at various companies
>>> to
>>> > deal
>>> > > > >> with
>>> > > > >> > > > > >> > one-broker-per-disk setup.
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > - JBOD vs. RAID-0: This is the setup that used at
>>> > > > Microsoft.
>>> > > > >> The
>>> > > > >> > > > > >> problem is
>>> > > > >> > > > > >> > that a broker becomes unavailable if any disk fail.
>>> > > Suppose
>>> > > > >> > > > > >> > replication-factor=2 and there are 10 disks per
>>> > machine.
>>> > > > >> Then the
>>> > > > >> > > > > >> > probability of of any message becomes unavailable
>>> due
>>> > to
>>> > > > disk
>>> > > > >> > > failure
>>> > > > >> > > > > >> with
>>> > > > >> > > > > >> > RAID-0 is 100X higher than that with JBOD.
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > - JBOD vs. one-broker-per-few-disks:
>>> > > > one-broker-per-few-disk
>>> > > > >> is
>>> > > > >> > > > > >> somewhere
>>> > > > >> > > > > >> > between one-broker-per-disk and RAID-0. So it
>>> carries
>>> > an
>>> > > > >> averaged
>>> > > > >> > > > > >> > disadvantages of these two approaches.
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > To answer your question regarding, I think it is
>>> > > reasonable
>>> > > > >> to
>>> > > > >> > > mange
>>> > > > >> > > > > >> disk
>>> > > > >> > > > > >> > in Kafka. By "managing disks" we mean the
>>> management of
>>> > > > >> > > assignment of
>>> > > > >> > > > > >> > replicas across disks. Here are my reasons in more
>>> > > detail:
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > - I don't think this KIP is a big step change. By
>>> > > allowing
>>> > > > >> user to
>>> > > > >> > > > > >> > configure Kafka to run multiple log directories or
>>> > disks
>>> > > as
>>> > > > >> of
>>> > > > >> > > now, it
>>> > > > >> > > > > >> is
>>> > > > >> > > > > >> > implicit that Kafka manages disks. It is just not a
>>> > > > complete
>>> > > > >> > > feature.
>>> > > > >> > > > > >> > Microsoft and probably other companies are using
>>> this
>>> > > > feature
>>> > > > >> > > under the
>>> > > > >> > > > > >> > undesirable effect that a broker will fail any if
>>> any
>>> > > disk
>>> > > > >> fail.
>>> > > > >> > > It is
>>> > > > >> > > > > >> good
>>> > > > >> > > > > >> > to complete this feature.
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > - I think it is reasonable to manage disk in
>>> Kafka. One
>>> > > of
>>> > > > >> the
>>> > > > >> > > most
>>> > > > >> > > > > >> > important work that Kafka is doing is to determine
>>> the
>>> > > > >> replica
>>> > > > >> > > > > >> assignment
>>> > > > >> > > > > >> > across brokers and make sure enough copies of a
>>> given
>>> > > > >> replica is
>>> > > > >> > > > > >> available.
>>> > > > >> > > > > >> > I would argue that it is not much different than
>>> > > > determining
>>> > > > >> the
>>> > > > >> > > replica
>>> > > > >> > > > > >> > assignment across disk conceptually.
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > - I would agree that this KIP is improve
>>> performance of
>>> > > > >> Kafka at
>>> > > > >> > > the
>>> > > > >> > > > > >> cost
>>> > > > >> > > > > >> > of more complexity inside Kafka, by switching from
>>> > > RAID-10
>>> > > > to
>>> > > > >> > > JBOD. I
>>> > > > >> > > > > >> would
>>> > > > >> > > > > >> > argue that this is a right direction. If we can
>>> gain
>>> > 20%+
>>> > > > >> > > performance by
>>> > > > >> > > > > >> > managing NIC in Kafka as compared to existing
>>> approach
>>> > > and
>>> > > > >> other
>>> > > > >> > > > > >> > alternatives, I would say we should just do it.
>>> Such a
>>> > > gain
>>> > > > >> in
>>> > > > >> > > > > >> performance,
>>> > > > >> > > > > >> > or equivalently reduction in cost, can save
>>> millions of
>>> > > > >> dollars
>>> > > > >> > > per year
>>> > > > >> > > > > >> > for any company running Kafka at large scale.
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > Thanks,
>>> > > > >> > > > > >> > Dong
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> > On Wed, Feb 1, 2017 at 5:41 AM, Eno Thereska <
>>> > > > >> > > eno.thereska@gmail.com>
>>> > > > >> > > > > >> wrote:
>>> > > > >> > > > > >> >
>>> > > > >> > > > > >> >> I'm coming somewhat late to the discussion,
>>> apologies
>>> > > for
>>> > > > >> that.
>>> > > > >> > > > > >> >>
>>> > > > >> > > > > >> >> I'm worried about this proposal. It's moving
>>> Kafka to
>>> > a
>>> > > > >> world
>>> > > > >> > > where it
>>> > > > >> > > > > >> >> manages disks. So in a sense, the scope of the
>>> KIP is
>>> > > > >> limited,
>>> > > > >> > > but the
>>> > > > >> > > > > >> >> direction it sets for Kafka is quite a big step
>>> > change.
>>> > > > >> > > Fundamentally
>>> > > > >> > > > > >> this
>>> > > > >> > > > > >> >> is about balancing resources for a Kafka broker.
>>> This
>>> > > can
>>> > > > be
>>> > > > >> > > done by a
>>> > > > >> > > > > >> >> tool, rather than by changing Kafka. E.g., the
>>> tool
>>> > > would
>>> > > > >> take a
>>> > > > >> > > bunch
>>> > > > >> > > > > >> of
>>> > > > >> > > > > >> >> disks together, create a volume over them and
>>> export
>>> > > that
>>> > > > >> to a
>>> > > > >> > > Kafka
>>> > > > >> > > > > >> broker
>>> > > > >> > > > > >> >> (in addition to setting the memory limits for that
>>> > > broker
>>> > > > or
>>> > > > >> > > limiting
>>> > > > >> > > > > >> other
>>> > > > >> > > > > >> >> resources). A different bunch of disks can then
>>> make
>>> > up
>>> > > a
>>> > > > >> second
>>> > > > >> > > > > >> volume,
>>> > > > >> > > > > >> >> and be used by another Kafka broker. This is
>>> aligned
>>> > > with
>>> > > > >> what
>>> > > > >> > > Colin is
>>> > > > >> > > > > >> >> saying (as I understand it).
>>> > > > >> > > > > >> >>
>>> > > > >> > > > > >> >> Disks are not the only resource on a machine,
>>> there
>>> > are
>>> > > > >> several
>>> > > > >> > > > > >> instances
>>> > > > >> > > > > >> >> where multiple NICs are used for example. Do we
>>> want
>>> > > fine
>>> > > > >> grained
>>> > > > >> > > > > >> >> management of all these resources? I'd argue that
>>> > opens
>>> > > us
>>> > > > >> the
>>> > > > >> > > system
>>> > > > >> > > > > >> to a
>>> > > > >> > > > > >> >> lot of complexity.
>>> > > > >> > > > > >> >>
>>> > > > >> > > > > >> >> Thanks
>>> > > > >> > > > > >> >> Eno
>>> > > > >> > > > > >> >>
>>> > > > >> > > > > >> >>
>>> > > > >> > > > > >> >>> On 1 Feb 2017, at 01:53, Dong Lin <
>>> > lindong28@gmail.com
>>> > > >
>>> > > > >> wrote:
>>> > > > >> > > > > >> >>>
>>> > > > >> > > > > >> >>> Hi all,
>>> > > > >> > > > > >> >>>
>>> > > > >> > > > > >> >>> I am going to initiate the vote If there is no
>>> > further
>>> > > > >> concern
>>> > > > >> > > with
>>> > > > >> > > > > >> the
>>> > > > >> > > > > >> >> KIP.
>>> > > > >> > > > > >> >>>
>>> > > > >> > > > > >> >>> Thanks,
>>> > > > >> > > > > >> >>> Dong
>>> > > > >> > > > > >> >>>
>>> > > > >> > > > > >> >>>
>>> > > > >> > > > > >> >>> On Fri, Jan 27, 2017 at 8:08 PM, radai <
>>> > > > >> > > radai.rosenblatt@gmail.com>
>>> > > > >> > > > > >> >> wrote:
>>> > > > >> > > > > >> >>>
>>> > > > >> > > > > >> >>>> a few extra points:
>>> > > > >> > > > > >> >>>>
>>> > > > >> > > > > >> >>>> 1. broker per disk might also incur more client
>>> <-->
>>> > > > >> broker
>>> > > > >> > > sockets:
>>> > > > >> > > > > >> >>>> suppose every producer / consumer "talks" to >1
>>> > > > partition,
>>> > > > >> > > there's a
>>> > > > >> > > > > >> >> very
>>> > > > >> > > > > >> >>>> good chance that partitions that were
>>> co-located on
>>> > a
>>> > > > >> single
>>> > > > >> > > 10-disk
>>> > > > >> > > > > >> >> broker
>>> > > > >> > > > > >> >>>> would now be split between several single-disk
>>> > broker
>>> > > > >> > > processes on
>>> > > > >> > > > > >> the
>>> > > > >> > > > > >> >> same
>>> > > > >> > > > > >> >>>> machine. hard to put a multiplier on this, but
>>> > likely
>>> > > > >x1.
>>> > > > >> > > sockets
>>> > > > >> > > > > >> are a
>>> > > > >> > > > > >> >>>> limited resource at the OS level and incur some
>>> > memory
>>> > > > >> cost
>>> > > > >> > > (kernel
>>> > > > >> > > > > >> >>>> buffers)
>>> > > > >> > > > > >> >>>>
>>> > > > >> > > > > >> >>>> 2. there's a memory overhead to spinning up a
>>> JVM
>>> > > > >> (compiled
>>> > > > >> > > code and
>>> > > > >> > > > > >> >> byte
>>> > > > >> > > > > >> >>>> code objects etc). if we assume this overhead is
>>> > ~300
>>> > > MB
>>> > > > >> > > (order of
>>> > > > >> > > > > >> >>>> magnitude, specifics vary) than spinning up 10
>>> JVMs
>>> > > > would
>>> > > > >> lose
>>> > > > >> > > you 3
>>> > > > >> > > > > >> GB
>>> > > > >> > > > > >> >> of
>>> > > > >> > > > > >> >>>> RAM. not a ton, but non negligible.
>>> > > > >> > > > > >> >>>>
>>> > > > >> > > > > >> >>>> 3. there would also be some overhead downstream
>>> of
>>> > > kafka
>>> > > > >> in any
>>> > > > >> > > > > >> >> management
>>> > > > >> > > > > >> >>>> / monitoring / log aggregation system. likely
>>> less
>>> > > than
>>> > > > >> x10
>>> > > > >> > > though.
>>> > > > >> > > > > >> >>>>
>>> > > > >> > > > > >> >>>> 4. (related to above) - added complexity of
>>> > > > administration
>>> > > > >> > > with more
>>> > > > >> > > > > >> >>>> running instances.
>>> > > > >> > > > > >> >>>>
>>> > > > >> > > > > >> >>>> is anyone running kafka with anywhere near 100GB
>>> > > heaps?
>>> > > > i
>>> > > > >> > > thought the
>>> > > > >> > > > > >> >> point
>>> > > > >> > > > > >> >>>> was to rely on kernel page cache to do the disk
>>> > > > buffering
>>> > > > >> ....
>>> > > > >> > > > > >> >>>>
>>> > > > >> > > > > >> >>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin <
>>> > > > >> > > lindong28@gmail.com>
>>> > > > >> > > > > >> wrote:
>>> > > > >> > > > > >> >>>>
>>> > > > >> > > > > >> >>>>> Hey Colin,
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>> Thanks much for the comment. Please see me
>>> comment
>>> > > > >> inline.
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe
>>> <
>>> > > > >> > > cmccabe@apache.org>
>>> > > > >> > > > > >> >>>> wrote:
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin
>>> wrote:
>>> > > > >> > > > > >> >>>>>>> Hey Colin,
>>> > > > >> > > > > >> >>>>>>>
>>> > > > >> > > > > >> >>>>>>> Good point! Yeah we have actually considered
>>> and
>>> > > > >> tested this
>>> > > > >> > > > > >> >>>> solution,
>>> > > > >> > > > > >> >>>>>>> which we call one-broker-per-disk. It would
>>> work
>>> > > and
>>> > > > >> should
>>> > > > >> > > > > >> require
>>> > > > >> > > > > >> >>>> no
>>> > > > >> > > > > >> >>>>>>> major change in Kafka as compared to this
>>> JBOD
>>> > KIP.
>>> > > > So
>>> > > > >> it
>>> > > > >> > > would
>>> > > > >> > > > > >> be a
>>> > > > >> > > > > >> >>>>> good
>>> > > > >> > > > > >> >>>>>>> short term solution.
>>> > > > >> > > > > >> >>>>>>>
>>> > > > >> > > > > >> >>>>>>> But it has a few drawbacks which makes it
>>> less
>>> > > > >> desirable in
>>> > > > >> > > the
>>> > > > >> > > > > >> long
>>> > > > >> > > > > >> >>>>>>> term.
>>> > > > >> > > > > >> >>>>>>> Assume we have 10 disks on a machine. Here
>>> are
>>> > the
>>> > > > >> problems:
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>> Hi Dong,
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>> Thanks for the thoughtful reply.
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>>>
>>> > > > >> > > > > >> >>>>>>> 1) Our stress test result shows that
>>> > > > >> one-broker-per-disk
>>> > > > >> > > has 15%
>>> > > > >> > > > > >> >>>> lower
>>> > > > >> > > > > >> >>>>>>> throughput
>>> > > > >> > > > > >> >>>>>>>
>>> > > > >> > > > > >> >>>>>>> 2) Controller would need to send 10X as many
>>> > > > >> > > LeaderAndIsrRequest,
>>> > > > >> > > > > >> >>>>>>> MetadataUpdateRequest and StopReplicaRequest.
>>> > This
>>> > > > >> > > increases the
>>> > > > >> > > > > >> >>>> burden
>>> > > > >> > > > > >> >>>>>>> on
>>> > > > >> > > > > >> >>>>>>> controller which can be the performance
>>> > bottleneck.
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>> Maybe I'm misunderstanding something, but
>>> there
>>> > > would
>>> > > > >> not be
>>> > > > >> > > 10x as
>>> > > > >> > > > > >> >>>> many
>>> > > > >> > > > > >> >>>>>> StopReplicaRequest RPCs, would there?  The
>>> other
>>> > > > >> requests
>>> > > > >> > > would
>>> > > > >> > > > > >> >>>> increase
>>> > > > >> > > > > >> >>>>>> 10x, but from a pretty low base, right?  We
>>> are
>>> > not
>>> > > > >> > > reassigning
>>> > > > >> > > > > >> >>>>>> partitions all the time, I hope (or else we
>>> have
>>> > > > bigger
>>> > > > >> > > > > >> problems...)
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>> I think the controller will group
>>> > StopReplicaRequest
>>> > > > per
>>> > > > >> > > broker and
>>> > > > >> > > > > >> >> send
>>> > > > >> > > > > >> >>>>> only one StopReplicaRequest to a broker during
>>> > > > controlled
>>> > > > >> > > shutdown.
>>> > > > >> > > > > >> >>>> Anyway,
>>> > > > >> > > > > >> >>>>> we don't have to worry about this if we agree
>>> that
>>> > > > other
>>> > > > >> > > requests
>>> > > > >> > > > > >> will
>>> > > > >> > > > > >> >>>>> increase by 10X. One MetadataRequest to send to
>>> > each
>>> > > > >> broker
>>> > > > >> > > in the
>>> > > > >> > > > > >> >>>> cluster
>>> > > > >> > > > > >> >>>>> every time there is leadership change. I am not
>>> > sure
>>> > > > >> this is
>>> > > > >> > > a real
>>> > > > >> > > > > >> >>>>> problem. But in theory this makes the overhead
>>> > > > complexity
>>> > > > >> > > O(number
>>> > > > >> > > > > >> of
>>> > > > >> > > > > >> >>>>> broker) and may be a concern in the future.
>>> Ideally
>>> > > we
>>> > > > >> should
>>> > > > >> > > avoid
>>> > > > >> > > > > >> it.
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>>>
>>> > > > >> > > > > >> >>>>>>> 3) Less efficient use of physical resource
>>> on the
>>> > > > >> machine.
>>> > > > >> > > The
>>> > > > >> > > > > >> number
>>> > > > >> > > > > >> >>>>> of
>>> > > > >> > > > > >> >>>>>>> socket on each machine will increase by 10X.
>>> The
>>> > > > >> number of
>>> > > > >> > > > > >> connection
>>> > > > >> > > > > >> >>>>>>> between any two machine will increase by
>>> 100X.
>>> > > > >> > > > > >> >>>>>>>
>>> > > > >> > > > > >> >>>>>>> 4) Less efficient way to management memory
>>> and
>>> > > quota.
>>> > > > >> > > > > >> >>>>>>>
>>> > > > >> > > > > >> >>>>>>> 5) Rebalance between disks/brokers on the
>>> same
>>> > > > machine
>>> > > > >> will
>>> > > > >> > > less
>>> > > > >> > > > > >> >>>>>>> efficient
>>> > > > >> > > > > >> >>>>>>> and less flexible. Broker has to read data
>>> from
>>> > > > another
>>> > > > >> > > broker on
>>> > > > >> > > > > >> the
>>> > > > >> > > > > >> >>>>>>> same
>>> > > > >> > > > > >> >>>>>>> machine via socket. It is also harder to do
>>> > > automatic
>>> > > > >> load
>>> > > > >> > > balance
>>> > > > >> > > > > >> >>>>>>> between
>>> > > > >> > > > > >> >>>>>>> disks on the same machine in the future.
>>> > > > >> > > > > >> >>>>>>>
>>> > > > >> > > > > >> >>>>>>> I will put this and the explanation in the
>>> > rejected
>>> > > > >> > > alternative
>>> > > > >> > > > > >> >>>>> section.
>>> > > > >> > > > > >> >>>>>>> I
>>> > > > >> > > > > >> >>>>>>> have a few questions:
>>> > > > >> > > > > >> >>>>>>>
>>> > > > >> > > > > >> >>>>>>> - Can you explain why this solution can help
>>> > avoid
>>> > > > >> > > scalability
>>> > > > >> > > > > >> >>>>>>> bottleneck?
>>> > > > >> > > > > >> >>>>>>> I actually think it will exacerbate the
>>> > scalability
>>> > > > >> problem
>>> > > > >> > > due
>>> > > > >> > > > > >> the
>>> > > > >> > > > > >> >>>> 2)
>>> > > > >> > > > > >> >>>>>>> above.
>>> > > > >> > > > > >> >>>>>>> - Why can we push more RPC with this
>>> solution?
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>> To really answer this question we'd have to
>>> take a
>>> > > > deep
>>> > > > >> dive
>>> > > > >> > > into
>>> > > > >> > > > > >> the
>>> > > > >> > > > > >> >>>>>> locking of the broker and figure out how
>>> > effectively
>>> > > > it
>>> > > > >> can
>>> > > > >> > > > > >> >> parallelize
>>> > > > >> > > > > >> >>>>>> truly independent requests.  Almost every
>>> > > > multithreaded
>>> > > > >> > > process is
>>> > > > >> > > > > >> >>>> going
>>> > > > >> > > > > >> >>>>>> to have shared state, like shared queues or
>>> shared
>>> > > > >> sockets,
>>> > > > >> > > that is
>>> > > > >> > > > > >> >>>>>> going to make scaling less than linear when
>>> you
>>> > add
>>> > > > >> disks or
>>> > > > >> > > > > >> >>>> processors.
>>> > > > >> > > > > >> >>>>>> (And clearly, another option is to improve
>>> that
>>> > > > >> scalability,
>>> > > > >> > > rather
>>> > > > >> > > > > >> >>>>>> than going multi-process!)
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>> Yeah I also think it is better to improve
>>> > scalability
>>> > > > >> inside
>>> > > > >> > > kafka
>>> > > > >> > > > > >> code
>>> > > > >> > > > > >> >>>> if
>>> > > > >> > > > > >> >>>>> possible. I am not sure we currently have any
>>> > > > scalability
>>> > > > >> > > issue
>>> > > > >> > > > > >> inside
>>> > > > >> > > > > >> >>>>> Kafka that can not be removed without using
>>> > > > >> multi-process.
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>>> - It is true that a garbage collection in one
>>> > > broker
>>> > > > >> would
>>> > > > >> > > not
>>> > > > >> > > > > >> affect
>>> > > > >> > > > > >> >>>>>>> others. But that is after every broker only
>>> uses
>>> > > 1/10
>>> > > > >> of the
>>> > > > >> > > > > >> memory.
>>> > > > >> > > > > >> >>>>> Can
>>> > > > >> > > > > >> >>>>>>> we be sure that this will actually help
>>> > > performance?
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>> The big question is, how much memory do Kafka
>>> > > brokers
>>> > > > >> use
>>> > > > >> > > now, and
>>> > > > >> > > > > >> how
>>> > > > >> > > > > >> >>>>>> much will they use in the future?  Our
>>> experience
>>> > in
>>> > > > >> HDFS
>>> > > > >> > > was that
>>> > > > >> > > > > >> >> once
>>> > > > >> > > > > >> >>>>>> you start getting more than 100-200GB Java
>>> heap
>>> > > sizes,
>>> > > > >> full
>>> > > > >> > > GCs
>>> > > > >> > > > > >> start
>>> > > > >> > > > > >> >>>>>> taking minutes to finish when using the
>>> standard
>>> > > JVMs.
>>> > > > >> That
>>> > > > >> > > alone
>>> > > > >> > > > > >> is
>>> > > > >> > > > > >> >> a
>>> > > > >> > > > > >> >>>>>> good reason to go multi-process or consider
>>> > storing
>>> > > > more
>>> > > > >> > > things off
>>> > > > >> > > > > >> >> the
>>> > > > >> > > > > >> >>>>>> Java heap.
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>> I see. Now I agree one-broker-per-disk should
>>> be
>>> > more
>>> > > > >> > > efficient in
>>> > > > >> > > > > >> >> terms
>>> > > > >> > > > > >> >>>> of
>>> > > > >> > > > > >> >>>>> GC since each broker probably needs less than
>>> 1/10
>>> > of
>>> > > > the
>>> > > > >> > > memory
>>> > > > >> > > > > >> >>>> available
>>> > > > >> > > > > >> >>>>> on a typical machine nowadays. I will remove
>>> this
>>> > > from
>>> > > > >> the
>>> > > > >> > > reason of
>>> > > > >> > > > > >> >>>>> rejection.
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>> Disk failure is the "easy" case.  The "hard"
>>> case,
>>> > > > >> which is
>>> > > > >> > > > > >> >>>>>> unfortunately also the much more common case,
>>> is
>>> > > disk
>>> > > > >> > > misbehavior.
>>> > > > >> > > > > >> >>>>>> Towards the end of their lives, disks tend to
>>> > start
>>> > > > >> slowing
>>> > > > >> > > down
>>> > > > >> > > > > >> >>>>>> unpredictably.  Requests that would have
>>> completed
>>> > > > >> > > immediately
>>> > > > >> > > > > >> before
>>> > > > >> > > > > >> >>>>>> start taking 20, 100 500 milliseconds.  Some
>>> files
>>> > > may
>>> > > > >> be
>>> > > > >> > > readable
>>> > > > >> > > > > >> and
>>> > > > >> > > > > >> >>>>>> other files may not be.  System calls hang,
>>> > > sometimes
>>> > > > >> > > forever, and
>>> > > > >> > > > > >> the
>>> > > > >> > > > > >> >>>>>> Java process can't abort them, because the
>>> hang is
>>> > > in
>>> > > > >> the
>>> > > > >> > > kernel.
>>> > > > >> > > > > >> It
>>> > > > >> > > > > >> >>>> is
>>> > > > >> > > > > >> >>>>>> not fun when threads are stuck in "D state"
>>> > > > >> > > > > >> >>>>>> http://stackoverflow.com/quest
>>> > > > >> ions/20423521/process-perminan
>>> > > > >> > > > > >> >>>>>> tly-stuck-on-d-state
>>> > > > >> > > > > >> >>>>>> .  Even kill -9 cannot abort the thread then.
>>> > > > >> Fortunately,
>>> > > > >> > > this is
>>> > > > >> > > > > >> >>>>>> rare.
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>> I agree it is a harder problem and it is rare.
>>> We
>>> > > > >> probably
>>> > > > >> > > don't
>>> > > > >> > > > > >> have
>>> > > > >> > > > > >> >> to
>>> > > > >> > > > > >> >>>>> worry about it in this KIP since this issue is
>>> > > > >> orthogonal to
>>> > > > >> > > > > >> whether or
>>> > > > >> > > > > >> >>>> not
>>> > > > >> > > > > >> >>>>> we use JBOD.
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>> Another approach we should consider is for
>>> Kafka
>>> > to
>>> > > > >> > > implement its
>>> > > > >> > > > > >> own
>>> > > > >> > > > > >> >>>>>> storage layer that would stripe across
>>> multiple
>>> > > disks.
>>> > > > >> This
>>> > > > >> > > > > >> wouldn't
>>> > > > >> > > > > >> >>>>>> have to be done at the block level, but could
>>> be
>>> > > done
>>> > > > >> at the
>>> > > > >> > > file
>>> > > > >> > > > > >> >>>> level.
>>> > > > >> > > > > >> >>>>>> We could use consistent hashing to determine
>>> which
>>> > > > >> disks a
>>> > > > >> > > file
>>> > > > >> > > > > >> should
>>> > > > >> > > > > >> >>>>>> end up on, for example.
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>> Are you suggesting that we should distribute
>>> log,
>>> > or
>>> > > > log
>>> > > > >> > > segment,
>>> > > > >> > > > > >> >> across
>>> > > > >> > > > > >> >>>>> disks of brokers? I am not sure if I fully
>>> > understand
>>> > > > >> this
>>> > > > >> > > > > >> approach. My
>>> > > > >> > > > > >> >>>> gut
>>> > > > >> > > > > >> >>>>> feel is that this would be a drastic solution
>>> that
>>> > > > would
>>> > > > >> > > require
>>> > > > >> > > > > >> >>>>> non-trivial design. While this may be useful to
>>> > > Kafka,
>>> > > > I
>>> > > > >> would
>>> > > > >> > > > > >> prefer
>>> > > > >> > > > > >> >> not
>>> > > > >> > > > > >> >>>>> to discuss this in detail in this thread
>>> unless you
>>> > > > >> believe
>>> > > > >> > > it is
>>> > > > >> > > > > >> >>>> strictly
>>> > > > >> > > > > >> >>>>> superior to the design in this KIP in terms of
>>> > > solving
>>> > > > >> our
>>> > > > >> > > use-case.
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>>> best,
>>> > > > >> > > > > >> >>>>>> Colin
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>>>
>>> > > > >> > > > > >> >>>>>>> Thanks,
>>> > > > >> > > > > >> >>>>>>> Dong
>>> > > > >> > > > > >> >>>>>>>
>>> > > > >> > > > > >> >>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin
>>> McCabe <
>>> > > > >> > > > > >> cmccabe@apache.org>
>>> > > > >> > > > > >> >>>>>>> wrote:
>>> > > > >> > > > > >> >>>>>>>
>>> > > > >> > > > > >> >>>>>>>> Hi Dong,
>>> > > > >> > > > > >> >>>>>>>>
>>> > > > >> > > > > >> >>>>>>>> Thanks for the writeup!  It's very
>>> interesting.
>>> > > > >> > > > > >> >>>>>>>>
>>> > > > >> > > > > >> >>>>>>>> I apologize in advance if this has been
>>> > discussed
>>> > > > >> > > somewhere else.
>>> > > > >> > > > > >> >>>>> But
>>> > > > >> > > > > >> >>>>>> I
>>> > > > >> > > > > >> >>>>>>>> am curious if you have considered the
>>> solution
>>> > of
>>> > > > >> running
>>> > > > >> > > > > >> multiple
>>> > > > >> > > > > >> >>>>>>>> brokers per node.  Clearly there is a memory
>>> > > > overhead
>>> > > > >> with
>>> > > > >> > > this
>>> > > > >> > > > > >> >>>>>> solution
>>> > > > >> > > > > >> >>>>>>>> because of the fixed cost of starting
>>> multiple
>>> > > JVMs.
>>> > > > >> > > However,
>>> > > > >> > > > > >> >>>>> running
>>> > > > >> > > > > >> >>>>>>>> multiple JVMs would help avoid scalability
>>> > > > >> bottlenecks.
>>> > > > >> > > You
>>> > > > >> > > > > >> could
>>> > > > >> > > > > >> >>>>>>>> probably push more RPCs per second, for
>>> example.
>>> > > A
>>> > > > >> garbage
>>> > > > >> > > > > >> >>>>> collection
>>> > > > >> > > > > >> >>>>>>>> in one broker would not affect the others.
>>> It
>>> > > would
>>> > > > >> be
>>> > > > >> > > > > >> interesting
>>> > > > >> > > > > >> >>>>> to
>>> > > > >> > > > > >> >>>>>>>> see this considered in the "alternate
>>> designs"
>>> > > > design,
>>> > > > >> > > even if
>>> > > > >> > > > > >> you
>>> > > > >> > > > > >> >>>>> end
>>> > > > >> > > > > >> >>>>>>>> up deciding it's not the way to go.
>>> > > > >> > > > > >> >>>>>>>>
>>> > > > >> > > > > >> >>>>>>>> best,
>>> > > > >> > > > > >> >>>>>>>> Colin
>>> > > > >> > > > > >> >>>>>>>>
>>> > > > >> > > > > >> >>>>>>>>
>>> > > > >> > > > > >> >>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin
>>> wrote:
>>> > > > >> > > > > >> >>>>>>>>> Hi all,
>>> > > > >> > > > > >> >>>>>>>>>
>>> > > > >> > > > > >> >>>>>>>>> We created KIP-112: Handle disk failure for
>>> > JBOD.
>>> > > > >> Please
>>> > > > >> > > find
>>> > > > >> > > > > >> the
>>> > > > >> > > > > >> >>>>> KIP
>>> > > > >> > > > > >> >>>>>>>>> wiki
>>> > > > >> > > > > >> >>>>>>>>> in the link https://cwiki.apache.org/confl
>>> > > > >> > > > > >> >>>> uence/display/KAFKA/KIP-
>>> > > > >> > > > > >> >>>>>>>>> 112%3A+Handle+disk+failure+for+JBOD.
>>> > > > >> > > > > >> >>>>>>>>>
>>> > > > >> > > > > >> >>>>>>>>> This KIP is related to KIP-113
>>> > > > >> > > > > >> >>>>>>>>> <https://cwiki.apache.org/conf
>>> > > > >> luence/display/KAFKA/KIP-
>>> > > > >> > > > > >> >>>>>>>> 113%3A+Support+replicas+moveme
>>> > > > >> nt+between+log+directories>:
>>> > > > >> > > > > >> >>>>>>>>> Support replicas movement between log
>>> > > directories.
>>> > > > >> They
>>> > > > >> > > are
>>> > > > >> > > > > >> >>>> needed
>>> > > > >> > > > > >> >>>>> in
>>> > > > >> > > > > >> >>>>>>>>> order
>>> > > > >> > > > > >> >>>>>>>>> to support JBOD in Kafka. Please help
>>> review
>>> > the
>>> > > > >> KIP. You
>>> > > > >> > > > > >> >>>> feedback
>>> > > > >> > > > > >> >>>>> is
>>> > > > >> > > > > >> >>>>>>>>> appreciated!
>>> > > > >> > > > > >> >>>>>>>>>
>>> > > > >> > > > > >> >>>>>>>>> Thanks,
>>> > > > >> > > > > >> >>>>>>>>> Dong
>>> > > > >> > > > > >> >>>>>>>>
>>> > > > >> > > > > >> >>>>>>
>>> > > > >> > > > > >> >>>>>
>>> > > > >> > > > > >> >>>>
>>> > > > >> > > > > >> >>
>>> > > > >> > > > > >> >>
>>> > > > >> > > > > >>
>>> > > > >> > > > > >>
>>> > > > >> > > > > >
>>> > > > >> > >
>>> > > > >>
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message