flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bowen Li <bowen...@offerupnow.com>
Subject Re: Could not flush and close the file system output stream to s3a, is this fixed?
Date Thu, 14 Dec 2017 18:24:43 GMT
Hi,

The problem reported in FLINK-7590 only happened one time on our end. And,
as you can see from its comments,  we suspected it's caused by AWS-SDK or
Hadoop's s3a implementation, which we have no control over.

Flink 1.4.0 has its own S3 implementations. I haven't tried it yet.


On Thu, Dec 14, 2017 at 2:05 AM, Fabian Hueske <fhueske@gmail.com> wrote:

> Bowen Li (in CC) closed the issue but there is no fix (or at least it is
> not linked in the JIRA).
> Maybe it was resolved in another issue or can be differently resolved.
>
> @Bowen, can you comment on how to fix this problem? Will it work in Flink
> 1.4.0?
>
> Thank you,
> Fabian
>
> 2017-12-13 5:28 GMT+01:00 Hao Sun <hasun@zendesk.com>:
>
>> https://issues.apache.org/jira/browse/FLINK-7590
>>
>> I have a similar situation with Flink 1.3.2 on K8S
>>
>> =========
>> 2017-12-13 00:57:12,403 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
>> - Source: KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3) (6ad009755a6009975d197e75afa05e14)
>> switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3).} at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:970) at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed:
>> java.lang.Exception: Could not properly cancel managed operator state
>> future. at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:98) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.cleanup(StreamTask.java:1023) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:96) ... 7 more Caused by:
>> java.io.IOException: Could not flush and close the file system output
>> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at org.apache.flink.runtime.state
>> .filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutpu
>> tStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) at
>> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOC
>> allable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>> at org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:270) at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:233) at
>> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.ca
>> ll(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Caused by:
>> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS
>> Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A, AWS Error Code:
>> RequestTimeout, AWS Error Message: Your socket connection to the server was
>> not read from or written to within the timeout period. Idle connections
>> will be closed., S3 Extended Request ID: dADBPVGflB29xtFb7ydxD2SU3LzHw2
>> cBkumOK5EX4TYgt+LVErSOShxPkZmGrCvmT39FHDbIryc= at
>> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
>> at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
>> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
>> at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
>> at com.amazonaws.services.s3.transfer.internal.UploadCallable.u
>> ploadInOneChunk(UploadCallable.java:108) at
>> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>> ... 4 more [CIRCULAR REFERENCE:java.io.IOException: Could not flush and
>> close the file system output stream to s3a://zendesk-euc1-fraud-preve
>> ntion-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0a
>> f/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
>> stream state handle] 2017-12-13 00:57:12,404 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job KafkaDemo
>> maxwell.tickets (env:production) (d5a8b2ab61625cf0aa1e66360b7ad0af)
>> switched from state RUNNING to FAILING. AsynchronousException{java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3).} at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:970) at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed:
>> java.lang.Exception: Could not properly cancel managed operator state
>> future. at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:98) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.cleanup(StreamTask.java:1023) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:96) ... 7 more Caused by:
>> java.io.IOException: Could not flush and close the file system output
>> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at org.apache.flink.runtime.state
>> .filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutpu
>> tStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) at
>> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOC
>> allable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>> at org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:270) at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:233) at
>> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.ca
>> ll(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Caused by:
>> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS
>> Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A, AWS Error Code:
>> RequestTimeout, AWS Error Message: Your socket connection to the server was
>> not read from or written to within the timeout period. Idle connections
>> will be closed., S3 Extended Request ID: dADBPVGflB29xtFb7ydxD2SU3LzHw2
>> cBkumOK5EX4TYgt+LVErSOShxPkZmGrCvmT39FHDbIryc= at
>> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
>> at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
>> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
>> at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
>> at com.amazonaws.services.s3.transfer.internal.UploadCallable.u
>> ploadInOneChunk(UploadCallable.java:108) at
>> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>> ... 4 more [CIRCULAR REFERENCE:java.io.IOException: Could not flush and
>> close the file system output stream to s3a://zendesk-euc1-fraud-preve
>> ntion-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0a
>> f/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
>> stream state handle]
>>
>>
>

Mime
View raw message