flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flink Developer <developer...@protonmail.com>
Subject Re: Flink Kafka to BucketingSink to S3 - S3Exception
Date Sun, 04 Nov 2018 10:09:21 GMT
Hi Ravi, some questions:

- Is this using Flink 1.6.2 with dependencies (flink-s3-fs-hadoop, flink-statebackend-rocksdb,
hadoop-common, hadoop-aws, hadoop-hdfs, hadoop-common) ? If so, could you please share your
dependency versioning?
- Does this use a kafka source with high flink parallelism (~400) for all kafka partitions
and does it run continuously for several days?
- Could you please share your checkpoint interval configuration, batch file size, batch rollover
interval configuration, and sink prefix (s3:// ,  s3a://)

Thank you
‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On Saturday, November 3, 2018 7:18 AM, Ravi Bhushan Ratnakar <ravibhushanratnakar@gmail.com>
wrote:

> I have done little changes in BucketingSink and implemented as new CustomBucketingSink
to use in my project which works fine with s3 and s3a protocol.  This implementation doesn't
require xml file configuration, rather than it uses configuration provided using flink configuration
object by calling setConfig method of BucketingSink.
>
> On Sat 3 Nov, 2018, 09:24 Flink Developer <developer143@protonmail.com wrote:
>
>> It seems the issue also appears when using Flink version 1.6.2 .
>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>> On Tuesday, October 30, 2018 10:26 PM, Flink Developer <developer143@protonmail.com>
wrote:
>>
>>> Hi, thanks for the info Rafi, that seems to be related.  I hope Flink version
1.6.2 fixes this. Has anyone encountered this before?
>>>
>>> I would also like to note that my jar includes a core-site.xml file that uses
*s3a*. Is this the recommended configuration to use with BucketingSink?   Should the sink
be specified using s3a://<bucket>/<prefix> or  s3://<bucket>/<prefix>
?
>>>
>>> - <configuration>
>>> -     <property>
>>> -         <name>fs.s3.impl</name>
>>> -         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>>> -     </property>
>>> -     <property>
>>> -         <name>fs.s3a.buffer.dir</name>
>>> -         <value>/tmp</value>
>>> -     </property>
>>> -     <property>
>>> -         <name>fs.s3a.access.key</name>
>>> -         <value>xxxxx</value>
>>> -     </property>
>>> -     <property>
>>> -         <name>fs.s3a.secret.key</name>
>>> -         <value>xxxxx</value>
>>> -     </property>
>>> -     <property>
>>> -         <name>fs.s3a.buffer.dir</name>
>>> -         <value>/tmp</value>
>>> -     </property>
>>> - </configuration>
>>>
>>> And my pom.xml uses:
>>>
>>> - <artifactId>flink-s3-fs-hadoop</artifactId>
>>> - ...
>>> - <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>> - ...
>>> - <artifactId>hadoop-hdfs</artifactId>
>>> - ...
>>> - <artifactId>hadoop-common</artifactId>
>>> - ...
>>> - <artifactId>hadoop-core</artifactId>
>>> - ...
>>> - <artifactId>hadoop-aws</artifactId>
>>> - ...
>>>
>>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>>> On Sunday, October 28, 2018 8:08 AM, Rafi Aroch <rafi.aroch@gmail.com>
wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm also experiencing this with Flink 1.5.2. This is probably related to
BucketingSink not working properly with S3 as filesystem because of the eventual-consistency
of S3.
>>>>
>>>> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part
of 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and not presto).
>>>>
>>>> Does anyone know if this fix would solve this issue?
>>>>
>>>> Thanks,
>>>> Rafi
>>>>
>>>> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer <developer143@protonmail.com>
wrote:
>>>>
>>>>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop
2.8.4)  with flink parallelization set to 400. The source is a Kafka topic and sinks to S3
in the format of: s3://<day>/<hour>/<worker_number>/<files>. There's
potentially 400 files writing simultaneously.
>>>>>
>>>>> Configuration:
>>>>> - Flink v1.5.2
>>>>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11,
v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause between checkpoints
in 2 mins. Timeout is set to 2 mins.
>>>>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>>>>> - Batch file size is set to 5mb.
>>>>> - Batch rollover interval is set to 30min
>>>>> - Writer uses GZip compression
>>>>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1,
hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>>>>
>>>>> The app is able to run for hours straight, but occasionally (once or
twice a day), it displays the following exception. When this happens, the app is able to recover
from previous checkpoint, but I am concerned about the exception:
>>>>>
>>>>> Caused by: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: xxxxxxxx, S3 Extended
Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)
>>>>>
>>>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)
>>>>>
>>>>> Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: xxxxxxxx, S3 Extended
Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91)
>>>>>
>>>>> And sometimes, it will show this:
>>>>>
>>>>> - java.lang.RuntimeException: Error while restoring BucketingSink state.
>>>>>
>>>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888)
>>>>>
>>>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767)
>>>>>
>>>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394)
>>>>>
>>>>> - at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>>>>
>>>>> - at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>>>>
>>>>> - at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>>>>
>>>>> - at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>>>>>
>>>>> - at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>>>>>
>>>>> - at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>>>>>
>>>>> - at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>>
>>>>> What causes this and how can it be resolved? Thank you.
>>>>>
>>>>> There seems to be a related Flink ticket and PR here, but I'm not sure
if this is the exact same issue and if it has been resolved:
>>>>> https://issues.apache.org/jira/browse/FLINK-6306
>>>>> https://github.com/apache/flink/pull/3752
>>>>> https://github.com/apache/flink/pull/4607
Mime
View raw message