From dev-return-106076-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Fri Jul 26 22:33:01 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 82904180595 for ; Sat, 27 Jul 2019 00:33:01 +0200 (CEST) Received: (qmail 86077 invoked by uid 500); 26 Jul 2019 22:32:59 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 86064 invoked by uid 99); 26 Jul 2019 22:32:57 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Jul 2019 22:32:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id C90F91A35B8 for ; Fri, 26 Jul 2019 22:32:56 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.599 X-Spam-Level: * X-Spam-Status: No, score=1.599 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, JMQ_SPF_NEUTRAL=0.5, RCVD_IN_DNSWL_LOW=-0.7, SPF_HELO_PASS=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=hbnet.io header.b=RfTEzb8s; dkim=pass (2048-bit key) header.d=messagingengine.com header.b=Ad83AdON Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 04PxCfNLQ17E for ; Fri, 26 Jul 2019 22:32:52 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=66.111.4.29; helo=out5-smtp.messagingengine.com; envelope-from=habib@hbnet.io; receiver= Received: from out5-smtp.messagingengine.com (out5-smtp.messagingengine.com [66.111.4.29]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 17465BC7E1 for ; Fri, 26 Jul 2019 22:32:51 +0000 (UTC) Received: from compute3.internal (compute3.nyi.internal [10.202.2.43]) by mailout.nyi.internal (Postfix) with ESMTP id 3A77A21EA0 for ; Fri, 26 Jul 2019 18:32:45 -0400 (EDT) Received: from imap2 ([10.202.2.52]) by compute3.internal (MEProxy); Fri, 26 Jul 2019 18:32:45 -0400 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=hbnet.io; h= mime-version:message-id:in-reply-to:references:date:from:to :subject:content-type; s=fm1; bh=H1KUfc+QbYa6EUZFiCLOBaR7kMNnY7w 0XYXJYV/ryd4=; b=RfTEzb8sbsfBkWndz/rIHrgumWcGC+JBOkPiO3kLcWh1YLp LfBGkW26kO5vHe03buHyJjbBNdga+Y+qjbmtkL9Htr0FZ5BQqsQQDZuhyMrlGF/3 5A2njcUYiZw2Z4kRUrrvHVxWjKX4YECMakPknE7qTRALrRnCV36wPBeEjrjp6kSC EC+/LqTYb5BVuTDTo+Lxe1G+3HLYEmA5J/s9Tv9vlG3FEKG9HE8QilowDIh6r+6g JK0MoEmy1XZl+10F889KRLaCfil/AId91T0hBybmFuSAjN5mL7XR2+WNcD2Xs6hq UJM2541wsVsWogBkk+gmuk2l0QsNVgeR8fOAgtw== DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d= messagingengine.com; h=content-type:date:from:in-reply-to :message-id:mime-version:references:subject:to:x-me-proxy :x-me-proxy:x-me-sender:x-me-sender:x-sasl-enc; s=fm3; bh=H1KUfc +QbYa6EUZFiCLOBaR7kMNnY7w0XYXJYV/ryd4=; b=Ad83AdONAHHqf8b9U7mMJU W+q1JsjvL35KgBMLW7ju+C0Smha68601wMr8JL0GluHtKTobEIUZr7OfBhTFigtD ryoQS3g7jvRiAbKxNrR3tOIYNVqWwcbz6FnZqiW0pWsnlz8wstpegSLFpK/hypOr Vt7cDAZM5+5Uj/G/390Nr6yw3daNodYyMSdQ9Hadgjay97Oli1idz3HF2jYRdfFz PhqmFHpSUPV2s5PFAkraykixOC/6zh7z2Egc48QJdyIL8W7Gcuowxqb1PS2TlyAm V7Qcmir3RFZ5iY8LnINoxU3lK9IK2DjPW9ufeY7Lu+MVBvdOkINJMQjMFpNnxswQ == X-ME-Sender: X-ME-Proxy-Cause: gggruggvucftvghtrhhoucdtuddrgeduvddrkeehgddufecutefuodetggdotefrodftvf curfhrohhfihhlvgemucfhrghsthforghilhdpqfgfvfdpuffrtefokffrpgfnqfghnecu uegrihhlohhuthemuceftddtnecunecujfgurhepofgfggfkjghffffhvffutgesrgdtre erreertdenucfhrhhomhepfdfjrggsihgsucfprghhrghsfdcuoehhrggsihgssehhsghn vghtrdhioheqnecuffhomhgrihhnpeifihhkihhpvgguihgrrdhorhhgpdhgihhthhhusg drtghomhenucfrrghrrghmpehmrghilhhfrhhomhephhgrsghisgeshhgsnhgvthdrihho necuvehluhhsthgvrhfuihiivgeptd X-ME-Proxy: Received: by mailuser.nyi.internal (Postfix, from userid 501) id A4EA7E00A2; Fri, 26 Jul 2019 18:32:44 -0400 (EDT) X-Mailer: MessagingEngine.com Webmail Interface User-Agent: Cyrus-JMAP/3.1.6-736-gdfb8e44-fmstable-20190718v2 Mime-Version: 1.0 Message-Id: <2d8ac78d-a78b-4d1c-97d3-73af23f3247a@www.fastmail.com> In-Reply-To: References: Date: Fri, 26 Jul 2019 15:32:43 -0700 From: "Habib Nahas" To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-405: Kafka Tiered Storage Content-Type: multipart/alternative; boundary=a0c357f42c8045f18d17f47c1d488f6e --a0c357f42c8045f18d17f47c1d488f6e Content-Type: text/plain Thanks for the reply, Harsha and Satish. I have a follow up, I'm fairly new to this space so forgive me if there is an obvious answer. AIUI the current design proposal requires explicit support for integrating new remote filesystems, such as GCP Cloud Storage or S3. If Kafka offers a solution that allows an archive data dir based on policy, I can mount a FUSE filesystem over, say over GCP or S3 and archive my older log segments to that mount. My team can certainly take advantage of such an implementation over say https://github.com/s3fs-fuse/s3fs-fuse. There are also a large number of other FUSE filesystems already implemented, which Kafka can take advantage of without additional work. Of course this could be a separate KIP as well, please let me know if that is the case. FUSE(with a list of implemented destinations)- https://en.wikipedia.org/wiki/Filesystem_in_Userspace Thanks, Habib On Thu, Jul 25, 2019, at 8:40 AM, Satish Duggana wrote: > >>Under the proposed definition of RemoteTier, would it be possible to > have an implementation that transfers older log segments to a slower storage > tier, but one that is still local? > Examples of slower local(ie mounted locally) tiers being HDDs vs SSDs, > or NFS volumes. > > No, it does not allow moving logs to different mount points in the > same broker. In the current design, follower replicas may not have the > log segments locally available but it will fetch the remote log > indexes for older segments. Follower replicas will not replicate the > data beyond local log retention. So, the data that is out of local log > retention period/size will be fetched from the remote tier. > > Thanks, > Satish. > > On Thu, Jul 25, 2019 at 6:01 AM Habib Nahas wrote: > > > > Hi, > > > > Under the proposed definition of RemoteTier, would it be possible to have an implementation that transfers older log segments to a slower storage tier, but one that is still local? > > Examples of slower local(ie mounted locally) tiers being HDDs vs SSDs, or NFS volumes. > > > > Let me know if I"m missing an existing solution for this usecase. > > Thanks, > > > > Habib > > > > > > On 2019/04/09 05:04:17, Harsha wrote: > > > Thanks, Ron. Updating the KIP. will add answers here as well> > > > > > > 1) If the cold storage technology can be cross-region, is there a> > > > possibility for a disaster recovery Kafka cluster to share the messages in> > > > cold storage? My guess is the answer is no, and messages replicated to the> > > > D/R cluster have to be migrated to cold storage from there independently.> > > > (The same cross-region cold storage medium could be used, but every message> > > > would appear there twice).> > > > > > > If I understand the question correctly, what you are saying is Kafka A cluster (active) shipping logs to remote storage which cross-region replication and another Kafka Cluster B (Passive) will it be able to use the remote storage copied logs directly.> > > > > > > > > > > > For the initial version my answer is No. We can handle this in subsequent changes after this one.> > > > > > > 2) Can/should external (non-Kafka) tools have direct access to the messages> > > > in cold storage. I think this might have been addressed when someone asked> > > > about ACLs, and I believe the answer is "no" -- if some external tool needs> > > > to operate on that data then that external tool should read that data by> > > > acting as a Kafka consumer. Again, just asking to get the answer clearly> > > > documented in case it is unclear.> > > > > > > The answer is No. All tools/clients must go through broker APIs to access any data (local or remote). > > > > Only Kafka broker user will have access to remote storage logs and Security/ACLs will work the way it does today.> > > > Tools/Clients going directly to the remote storage might help in terms of efficiency but this requires Protocol changes and some way of syncing ACLs in Kafka to the Remote storage. > > > > > > > > > > > > > > > > > > Thanks,> > > > Harsha> > > > > > > On Mon, Apr 8, 2019, at 8:48 AM, Ron Dagostino wrote:> > > > > Hi Harsha. A couple of questions. I think I know the answers, but it> > > > > would be good to see them explicitly documented.> > > > > > > > > > 1) If the cold storage technology can be cross-region, is there a> > > > > possibility for a disaster recovery Kafka cluster to share the messages in> > > > > cold storage? My guess is the answer is no, and messages replicated to the> > > > > D/R cluster have to be migrated to cold storage from there independently.> > > > > (The same cross-region cold storage medium could be used, but every message> > > > > would appear there twice).> > > > > > > > > > 2) Can/should external (non-Kafka) tools have direct access to the messages> > > > > in cold storage. I think this might have been addressed when someone asked> > > > > about ACLs, and I believe the answer is "no" -- if some external tool needs> > > > > to operate on that data then that external tool should read that data by> > > > > acting as a Kafka consumer. Again, just asking to get the answer clearly> > > > > documented in case it is unclear.> > > > > > > > > > Ron> > > > > > > > > > > > > > > On Thu, Apr 4, 2019 at 12:53 AM Harsha wrote:> > > > > > > > > > > Hi Viktor,> > > > > >> > > > > >> > > > > > "Now, will the consumer be able to consume a remote segment if:> > > > > > - the remote segment is stored in the remote storage, BUT> > > > > > - the leader broker failed right after this AND> > > > > > - the follower which is to become a leader didn't scan yet for a new> > > > > > segment?"> > > > > >> > > > > > If I understand correctly, after a local log segment copied to remote and> > > > > > leader is failed to write the index files and leadership changed to a> > > > > > follower. In this case we consider the log segment copy failed and newly> > > > > > elected leader will start copying the data from last the known offset in> > > > > > the remote to copy. Consumers who are looking for the offset which might> > > > > > be in the failed copy log segment will continue to be read the data from> > > > > > local disk since the local log segment will only be deleted once a> > > > > > successful copy of the log segment.> > > > > >> > > > > > "As a follow-up question, what are your experiences, does a failover in a> > > > > > broker causes bigger than usual churn in the consumers? (I'm thinking about> > > > > > the time required to rebuild remote index files.)"> > > > > >> > > > > > Rebuild remote index files will only happen in case of remote storage> > > > > > missing all the copied index files. Fail-over will not trigger this> > > > > > rebuild.> > > > > >> > > > > >> > > > > > Hi Ryan,> > > > > >> > > > > > "Harsha, can you comment on this alternative approach: instead of fetching> > > > > > directly from remote storage via a new API, implement something like> > > > > > paging, where segments are paged-in and out of cold storage based on access> > > > > > frequency/recency? For example, when a remote segment is accessed, it could> > > > > > be first fetched to disk and then read from there. I suppose this would> > > > > > require less code changes, or at least less API changes."> > > > > >> > > > > > Copying whole log segment from remote is inefficient. When tiered storage> > > > > > is enabled users might prefer hardware with smaller disks and having to> > > > > > copy the log segment to local disk again , especially incase of multiple> > > > > > consumers on multiple topics triggering this might negatively affect the> > > > > > available local storage.> > > > > > What we proposed in the KIP doesn't affect the existing APIs and we didn't> > > > > > call for any API changes.> > > > > >> > > > > > "And related to paging, does the proposal address what happens when a> > > > > > broker> > > > > > runs out of HDD space? Maybe we should have a way to configure a max number> > > > > > of segments or bytes stored on each broker, after which older or> > > > > > least-recently-used segments are kicked out, even if they aren't expired> > > > > > per the retention policy? Otherwise, I suppose tiered storage requires some> > > > > > babysitting to ensure that brokers don't run out of local storage, despite> > > > > > having access to potentially unbounded cold storage."> > > > > >> > > > > > Existing Kafka behavior will not change with addition of tiered storage> > > > > > and enabling it also will not change behavior.> > > > > > Just like today it's up to the operator to make sure the HD space is> > > > > > monitored and take necessary actions to mitigate that before it becomes> > > > > > fatal failure for broker. We don't stop users to configure the retention> > > > > > period to infinite and they can easily run out of the space.> > > > > >> > > > > > These are not the alternatives considered as they are not efficient copy> > > > > > in out of local disk , hence the reason we didn't add to alternatives> > > > > > considered :).> > > > > >> > > > > >> > > > > >> > > > > > Thanks,> > > > > > Harsha> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > > On Wed, Apr 3, 2019, at 7:51 AM, Ryanne Dolan wrote:> > > > > > > Harsha, can you comment on this alternative approach: instead of fetching> > > > > > > directly from remote storage via a new API, implement something like> > > > > > > paging, where segments are paged-in and out of cold storage based on> > > > > > access> > > > > > > frequency/recency? For example, when a remote segment is accessed, it> > > > > > could> > > > > > > be first fetched to disk and then read from there. I suppose this would> > > > > > > require less code changes, or at least less API changes.> > > > > > >> > > > > > > And related to paging, does the proposal address what happens when a> > > > > > broker> > > > > > > runs out of HDD space? Maybe we should have a way to configure a max> > > > > > number> > > > > > > of segments or bytes stored on each broker, after which older or> > > > > > > least-recently-used segments are kicked out, even if they aren't expired> > > > > > > per the retention policy? Otherwise, I suppose tiered storage requires> > > > > > some> > > > > > > babysitting to ensure that brokers don't run out of local storage,> > > > > > despite> > > > > > > having access to potentially unbounded cold storage.> > > > > > >> > > > > > > Just some things to add to Alternatives Considered :)> > > > > > >> > > > > > > Ryanne> > > > > > >> > > > > > > On Wed, Apr 3, 2019 at 8:21 AM Viktor Somogyi-Vass <> > > > > > viktorsomogyi@gmail.com>> > > > > > > wrote:> > > > > > >> > > > > > > > Hi Harsha,> > > > > > > >> > > > > > > > Thanks for the answer, makes sense.> > > > > > > > In the meantime one edge case popped up in my mind but first let me> > > > > > > > summarize what I understand if I interpret your KIP correctly.> > > > > > > >> > > > > > > > So basically whenever the leader RSM copies over a segment to the> > > > > > remote> > > > > > > > storage, the leader RLM will append an entry to its remote index files> > > > > > with> > > > > > > > the remote position. After this LogManager can delete the local> > > > > > segment.> > > > > > > > Parallel to this RLM followers are periodically scanning the remote> > > > > > storage> > > > > > > > for files and if they find a new one they update their indices.> > > > > > > >> > > > > > > > Now, will the consumer be able to consume a remote segment if:> > > > > > > > - the remote segment is stored in the remote storage, BUT> > > > > > > > - the leader broker failed right after this AND> > > > > > > > - the follower which is to become a leader didn't scan yet for a new> > > > > > > > segment?> > > > > > > > Would this result in an OffsetOutOfRangeException or would the failover> > > > > > > > halt the consume request until the new leader has the latest> > > > > > information?> > > > > > > > As a follow-up question, what are your experiences, does a failover in> > > > > > a> > > > > > > > broker causes bigger than usual churn in the consumers? (I'm thinking> > > > > > about> > > > > > > > the time required to rebuild remote index files.)> > > > > > > >> > > > > > > > Thanks,> > > > > > > > Viktor> > > > > > > >> > > > > > > > On Mon, Apr 1, 2019 at 8:49 PM Harsha wrote:> > > > > > > >> > > > > > > > > Hi Eno,> > > > > > > > >> > > > > > > > > Thanks for the comments. Answers are inline> > > > > > > > >> > > > > > > > > "Performance & durability> > > > > > > > > ----------------------------------> > > > > > > > > - would be good to have more discussion on performance implications> > > > > > of> > > > > > > > > tiering. Copying the data from the local storage to the remote> > > > > > storage is> > > > > > > > > going to be expensive in terms of network bandwidth and will affect> > > > > > > > > foreground traffic to Kafka potentially reducing its throughput and> > > > > > > > > latency."> > > > > > > > >> > > > > > > > > Good point. We've run our local tests with 10GigE cards, even though> > > > > > our> > > > > > > > > clients bandwidth requirements are high with 1000s of clients> > > > > > producing /> > > > > > > > > consuming data we never hit hit our limits on network bandwidth. More> > > > > > > > often> > > > > > > > > we hit limits of CPU, Mem limits than the network bandwidth. But> > > > > > this is> > > > > > > > > something to be taken care of by the operator if they want to enable> > > > > > > > tiered> > > > > > > > > storage.> > > > > > > > > Also as mentioned in the KIP/previous threads ,clients requesting> > > > > > older> > > > > > > > > data is very rare and often used as insurance policy . What proposed> > > > > > here> > > > > > > > > does increase bandwidth interms of shipping logsegments to remote but> > > > > > > > > access patterns determines how much we end up reading from remote> > > > > > tier.> > > > > > > > >> > > > > > > > >> > > > > > > > > "- throttling the copying of the data above might be a solution,> > > > > > however,> > > > > > > > > if> > > > > > > > > you have a few TB of data to move to the slower remote tier the risk> > > > > > is> > > > > > > > > that the movement will never complete on time under high Kafka load.> > > > > > Do> > > > > > > > we> > > > > > > > > need a scheduler to use idle time to do the copying?"> > > > > > > > >> > > > > > > > > In our design, we are going to have scheduler in RLM which will> > > > > > > > > periodically copy in-active(rolled-over) log segments.> > > > > > > > > Not sure idle time is easy to calculate and schedule a copy. More> > > > > > over we> > > > > > > > > want to copy the segments as soon as they are available.> > > > > > > > > Throttling something we can take into account and provide options to> > > > > > tune> > > > > > > > > it.> > > > > > > > >> > > > > > > > >> > > > > > > > > "- Have you considered having two options: 1) a slow tier only> > > > > > (e.g., all> > > > > > > > > the data on HDFS) and 2) a fast tier only like Kafka today. This> > > > > > would> > > > > > > > > avoid copying data between the tiers. Customers that can tolerate a> > > > > > > > slower> > > > > > > > > tier with a better price/GB can just choose option (1). Would be> > > > > > good to> > > > > > > > > put in Alternatives considered."> > > > > > > > >> > > > > > > > > What we want to have is Kafka that is known to the users today with> > > > > > > > local> > > > > > > > > fast disk access and fast data serving layer. Tiered Storage option> > > > > > > > might> > > > > > > > > not be for everyone and most users who are happy with Kafka today> > > > > > > > shouldn't> > > > > > > > > see changes to their operation because of this KIP.> > > > > > > > >> > > > > > > > > Fundamentally, we believe remote tiered storage data accessed very> > > > > > > > > infrequently. We expect anyone going to read from remote tiered> > > > > > storage> > > > > > > > > expects a slower read response (mostly backfills).> > > > > > > > >> > > > > > > > > Making an explicit change like slow/fast tier will only cause more> > > > > > > > > confusion and operation complexity that will bring into play. With> > > > > > tiered> > > > > > > > > storage , only users who want to use cheaper long-term storage can> > > > > > enable> > > > > > > > > it and others can operate the Kafka as its today. It will give a> > > > > > good> > > > > > > > > balance of serving latest reads from local disk almost all the time> > > > > > and> > > > > > > > > shipping older data and reading from remote tier when clients needs> > > > > > the> > > > > > > > > older data. If necessary, we can re-visit slow/fast-tier options at a> > > > > > > > later> > > > > > > > > point.> > > > > > > > >> > > > > > > > >> > > > > > > > > "Topic configs> > > > > > > > > ------------------> > > > > > > > > - related to performance but also availability, we need to discuss> > > > > > the> > > > > > > > > replication mode for the remote tier. For example, if the Kafka> > > > > > topics> > > > > > > > used> > > > > > > > > to have 3-way replication, will they continue to have 3-way> > > > > > replication> > > > > > > > on> > > > > > > > > the remote tier? Will the user configure that replication? In S3 for> > > > > > > > > example, one can choose from different S3 tiers like STD or SIA, but> > > > > > > > there> > > > > > > > > is no direct control over the replication factor like in Kafka."> > > > > > > > >> > > > > > > > > No. Remote tier is expected to be reliable storage with its own> > > > > > > > > replication mechanisms.> > > > > > > > >> > > > > > > > >> > > > > > > > > " how will security and ACLs be configured for the remote tier.> > > > > > E.g., if> > > > > > > > > user A does not have access to a Kafka topic, when that topic is> > > > > > moved to> > > > > > > > > S3 or HDFS there needs to be a way to prevent access to the S3> > > > > > bucket for> > > > > > > > > that user. This might be outside the scope of this KIP but would be> > > > > > good> > > > > > > > to> > > > > > > > > discuss first."> > > > > > > > >> > > > > > > > > As mentioned in the KIP "Alternatives" section We will keep the> > > > > > Kafka as> > > > > > > > > the owner of those files in S3 or HDFS and take advantage of HDFS> > > > > > > > security> > > > > > > > > model (file system permissions). So any user who wants to go> > > > > > directly and> > > > > > > > > access files from HDFS will not be able to read them and any client> > > > > > > > > requests will go through Kafka and its ACLs will apply like it does> > > > > > for> > > > > > > > any> > > > > > > > > other request.> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > Hi Ron,> > > > > > > > > Thanks for the comments.> > > > > > > > >> > > > > > > > > " I'm excited about this potential feature. Did you consider> > > > > > > > > storing the information about the remote segments in a Kafka topic as> > > > > > > > > opposed to in the remote storage itself? The topic would need> > > > > > infinite> > > > > > > > > retention (or it would need to be compacted) so as not to itself b > > [message truncated...] > > > > > --a0c357f42c8045f18d17f47c1d488f6e--