flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Checkpointing with RocksDB as statebackend
Date Tue, 14 Mar 2017 16:39:16 GMT
The issue in Flink is https://issues.apache.org/jira/browse/FLINK-5756

On Tue, Mar 14, 2017 at 3:40 PM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi Vinay,
>
> I think the issue is tracked here: https://github.com/
> facebook/rocksdb/issues/1988.
>
> Best,
> Stefan
>
> Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath <
> vishnu.viswanath25@gmail.com>:
>
> Hi Stephan,
>
> Is there a ticket number/link to track this, My job has all the conditions
> you mentioned.
>
> Thanks,
> Vishnu
>
> On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <sewen@apache.org> wrote:
>
>> Hi Vinay!
>>
>> We just discovered a bug in RocksDB. The bug affects windows without
>> reduce() or fold(), windows with evictors, and ListState.
>>
>> A certain access pattern in RocksDB starts being so slow after a certain
>> size-per-key that it basically brings down the streaming program and the
>> snapshots.
>>
>> We are reaching out to the RocksDB folks and looking for workarounds in
>> Flink.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Mar 1, 2017 at 12:10 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> @vinay  Can you try to not set the buffer timeout at all? I am actually
>>> not sure what would be the effect of setting it to a negative value, that
>>> can be a cause of problems...
>>>
>>>
>>> On Mon, Feb 27, 2017 at 7:44 PM, Seth Wiesman <swiesman@mediamath.com>
>>> wrote:
>>>
>>>> Vinay,
>>>>
>>>>
>>>>
>>>> The bucketing sink performs rename operations during the checkpoint and
>>>> if it tries to rename a file that is not yet consistent that would cause
a
>>>> FileNotFound exception which would fail the checkpoint.
>>>>
>>>>
>>>>
>>>> Stephan,
>>>>
>>>>
>>>>
>>>> Currently my aws fork contains some very specific assumptions about the
>>>> pipeline that will in general only hold for my pipeline. This is because
>>>> there were still some open questions that  I had about how to solve
>>>> consistency issues in the general case. I will comment on the Jira issue
>>>> with more specific.
>>>>
>>>>
>>>>
>>>> Seth Wiesman
>>>>
>>>>
>>>>
>>>> *From: *vinay patil <vinay18.patil@gmail.com>
>>>> *Reply-To: *"user@flink.apache.org" <user@flink.apache.org>
>>>> *Date: *Monday, February 27, 2017 at 1:05 PM
>>>> *To: *"user@flink.apache.org" <user@flink.apache.org>
>>>>
>>>> *Subject: *Re: Checkpointing with RocksDB as statebackend
>>>>
>>>>
>>>>
>>>> Hi Seth,
>>>>
>>>> Thank you for your suggestion.
>>>>
>>>> But if the issue is only related to S3, then why does this happen when
>>>> I replace the S3 sink  to HDFS as well (for checkpointing I am using HDFS
>>>> only )
>>>>
>>>> Stephan,
>>>>
>>>> Another issue I see is when I set env.setBufferTimeout(-1) , and keep
>>>> the checkpoint interval to 10minutes, I have observed that nothing gets
>>>> written to sink (tried with S3 as well as HDFS), atleast I was expecting
>>>> pending files here.
>>>>
>>>> This issue gets worst when checkpointing is disabled  as nothing is
>>>> written.
>>>>
>>>>
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Vinay Patil
>>>>
>>>>
>>>>
>>>> On Mon, Feb 27, 2017 at 10:55 PM, Stephan Ewen [via Apache Flink User
>>>> Mailing List archive.] <[hidden email]> wrote:
>>>>
>>>> Hi Seth!
>>>>
>>>>
>>>>
>>>> Wow, that is an awesome approach.
>>>>
>>>>
>>>>
>>>> We have actually seen these issues as well and we are looking to
>>>> eventually implement our own S3 file system (and circumvent Hadoop's S3
>>>> connector that Flink currently relies on): https://issues.apache.org
>>>> /jira/browse/FLINK-5706
>>>>
>>>>
>>>>
>>>> Do you think your patch would be a good starting point for that and
>>>> would you be willing to share it?
>>>>
>>>>
>>>>
>>>> The Amazon AWS SDK for Java is Apache 2 licensed, so that is possible
>>>> to fork officially, if necessary...
>>>>
>>>>
>>>>
>>>> Greetings,
>>>>
>>>> Stephan
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Feb 27, 2017 at 5:15 PM, Seth Wiesman <[hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=0>> wrote:
>>>>
>>>> Just wanted to throw in my 2cts.
>>>>
>>>>
>>>>
>>>> I’ve been running pipelines with similar state size using rocksdb which
>>>> externalize to S3 and bucket to S3. I was getting stalls like this and
>>>> ended up tracing the problem to S3 and the bucketing sink. The solution was
>>>> two fold:
>>>>
>>>>
>>>>
>>>> 1)       I forked hadoop-aws and have it treat flink as a source of
>>>> truth. Emr uses a dynamodb table to determine if S3 is inconsistent.
>>>> Instead I say that if flink believes that a file exists on S3 and we don’t
>>>> see it then I am going to trust that flink is in a consistent state and S3
>>>> is not. In this case, various operations will perform a back off and retry
>>>> up to a certain number of times.
>>>>
>>>>
>>>>
>>>> 2)       The bucketing sink performs multiple renames over the
>>>> lifetime of a file, occurring when a checkpoint starts and then again on
>>>> notification after it completes. Due to S3’s consistency guarantees the
>>>> second rename of file can never be assured to work and will eventually fail
>>>> either during or after a checkpoint. Because there is no upper bound on the
>>>> time it will take for a file on S3 to become consistent, retries cannot
>>>> solve this specific problem as it could take upwards of many minutes to
>>>> rename which would stall the entire pipeline. The only viable solution I
>>>> could find was to write a custom sink which understands S3. Each writer
>>>> will write file locally and then copy it to S3 on checkpoint. By only
>>>> interacting with S3 once per file it can circumvent consistency issues all
>>>> together.
>>>>
>>>>
>>>>
>>>> Hope this helps,
>>>>
>>>>
>>>>
>>>> Seth Wiesman
>>>>
>>>>
>>>>
>>>> *From: *vinay patil <[hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=1>>
>>>> *Reply-To: *"[hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=2>" <[hidden
email]
>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=3>>
>>>> *Date: *Saturday, February 25, 2017 at 10:50 AM
>>>> *To: *"[hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=4>" <[hidden
email]
>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=5>>
>>>> *Subject: *Re: Checkpointing with RocksDB as statebackend
>>>>
>>>>
>>>>
>>>> HI Stephan,
>>>>
>>>> Just to avoid the confusion here, I am using S3 sink for writing the
>>>> data, and using HDFS for storing checkpoints.
>>>>
>>>> There are 2 core nodes (HDFS) and two task nodes on EMR
>>>>
>>>>
>>>> I replaced s3 sink with HDFS for writing data in my last test.
>>>>
>>>> Let's say the checkpoint interval is 5 minutes, now within 5minutes of
>>>> run the state size grows to 30GB ,  after checkpointing the 30GB state that
>>>> is maintained in rocksDB has to be copied to HDFS, right ?  is this causing
>>>> the pipeline to stall ?
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Vinay Patil
>>>>
>>>>
>>>>
>>>> On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <[hidden email]> wrote:
>>>>
>>>> Hi Stephan,
>>>>
>>>> To verify if S3 is making teh pipeline stall, I have replaced the S3
>>>> sink with HDFS and kept minimum pause between checkpoints to 5minutes,
>>>> still I see the same issue with checkpoints getting failed.
>>>>
>>>> If I keep the  pause time to 20 seconds, all checkpoints are completed
>>>> , however there is a hit in overall throughput.
>>>>
>>>>
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Vinay Patil
>>>>
>>>>
>>>>
>>>> On Fri, Feb 24, 2017 at 10:09 PM, Stephan Ewen [via Apache Flink User
>>>> Mailing List archive.] <[hidden email]> wrote:
>>>>
>>>> Flink's state backends currently do a good number of "make sure this
>>>> exists" operations on the file systems. Through Hadoop's S3 filesystem,
>>>> that translates to S3 bucket list operations, where there is a limit in how
>>>> many operation may happen per time interval. After that, S3 blocks.
>>>>
>>>>
>>>>
>>>> It seems that operations that are totally cheap on HDFS are hellishly
>>>> expensive (and limited) on S3. It may be that you are affected by that.
>>>>
>>>>
>>>>
>>>> We are gradually trying to improve the behavior there and be more S3
>>>> aware.
>>>>
>>>>
>>>>
>>>> Both 1.3-SNAPSHOT and 1.2-SNAPSHOT already contain improvements there.
>>>>
>>>>
>>>>
>>>> Best,
>>>>
>>>> Stephan
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Feb 24, 2017 at 4:42 PM, vinay patil <[hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=11891&i=0>> wrote:
>>>>
>>>> Hi Stephan,
>>>>
>>>> So do you mean that S3 is causing the stall , as I have mentioned in my
>>>> previous mail, I could not see any progress for 16minutes as checkpoints
>>>> were getting failed continuously.
>>>>
>>>>
>>>>
>>>> On Feb 24, 2017 8:30 PM, "Stephan Ewen [via Apache Flink User Mailing
>>>> List archive.]" <[hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=11887&i=0>> wrote:
>>>>
>>>> Hi Vinay!
>>>>
>>>>
>>>>
>>>> True, the operator state (like Kafka) is currently not asynchronously
>>>> checkpointed.
>>>>
>>>>
>>>>
>>>> While it is rather small state, we have seen before that on S3 it can
>>>> cause trouble, because S3 frequently stalls uploads of even data amounts
as
>>>> low as kilobytes due to its throttling policies.
>>>>
>>>>
>>>>
>>>> That would be a super important fix to add!
>>>>
>>>>
>>>>
>>>> Best,
>>>>
>>>> Stephan
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Feb 24, 2017 at 2:58 PM, vinay patil <[hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=11885&i=0>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I have attached a snapshot for reference:
>>>> As you can see all the 3 checkpointins failed , for checkpoint ID 2 and
>>>> 3 it
>>>> is stuck at the Kafka source after 50%
>>>> (The data sent till now by Kafka source 1 is 65GB and sent by source 2
>>>> is
>>>> 15GB )
>>>>
>>>> Within 10minutes 15M records were processed, and for the next 16minutes
>>>> the
>>>> pipeline is stuck , I don't see any progress beyond 15M because of
>>>> checkpoints getting failed consistently.
>>>>
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.na
>>>> bble.com/file/n11882/Checkpointing_Failed.png>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-flink-user-maili
>>>> ng-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-
>>>> RocksDB-as-statebackend-tp11752p11882.html
>>>>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive at Nabble.com.
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>>
>>>> *If you reply to this email, your message will be added to the
>>>> discussion below:*
>>>>
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175
>>>> 2p11885.html
>>>>
>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>> email [hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=11887&i=1>
>>>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>>>> NAML
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>>
>>>> ------------------------------
>>>>
>>>> View this message in context: Re: Checkpointing with RocksDB as
>>>> statebackend
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11887.html>
>>>>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>> at Nabble.com.
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>>
>>>> *If you reply to this email, your message will be added to the
>>>> discussion below:*
>>>>
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175
>>>> 2p11891.html
>>>>
>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>> email [hidden email]
>>>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>>>> NAML
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>>
>>>> View this message in context: Re: Checkpointing with RocksDB as
>>>> statebackend
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11913.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>> at Nabble.com.
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>>
>>>> *If you reply to this email, your message will be added to the
>>>> discussion below:*
>>>>
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175
>>>> 2p11943.html
>>>>
>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>> email [hidden email]
>>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>>> .
>>>> NAML
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>>
>>>> View this message in context: Re: Checkpointing with RocksDB as
>>>> statebackend
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11949.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>> at Nabble.com.
>>>>
>>>>
>>>
>>
>
>

Mime
View raw message