flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ken Krugler <kkrugler_li...@transpac.com>
Subject Batch mode with Flink 1.8 unstable?
Date Mon, 24 Jun 2019 00:32:09 GMT
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly
fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes
see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
  - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))
-> Map (ad dailies) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:34))
-> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter
(Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor)
-> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877)
switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
	at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting
the next input split failed.
	at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
	at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
	... 3 more
Caused by: java.util.concurrent.TimeoutException
	at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
	at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
	... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
  - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d)
switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
	at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting
the next input split failed.
	at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
	at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
	... 3 more
Caused by: java.util.concurrent.TimeoutException
	at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
	at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
	... 4 more

I saw bjbq4d@gmail.com’s email recently about a similar issue:

> I figured this out myself. In my yarn container logs I saw this warning/error,
> 
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]:
<akka.tcp://flink@HOST:43911/temp/$n]:> max allowed size 10485760 bytes, actual size
of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.
> 
> Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize
and is set to 10MB by default. Increasing this past the size of my side input fixed the issue.
I'm guessing this is due to creating the side input PCollection from memory using the Create.of
APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager
xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941
<https://issues.apache.org/jira/browse/FLINK-10941>, and thus I’d avoid the problem,
but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers
are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers.
One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing
because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some
TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl   
  - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                  
  - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                  
  - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor
exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                  
  - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                  
  - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor
exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                  
  - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (akka.tcp://flink@ip-10-28-81-66.ec2.internal:36311/user/taskmanager_0
<akka.tcp://flink@ip-10-28-81-66.ec2.internal:36311/user/taskmanager_0>) because the
framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                  
  - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (akka.tcp://flink@ip-10-47-197-146.ec2.internal:44403/user/taskmanager_0
<akka.tcp://flink@ip-10-47-197-146.ec2.internal:44403/user/taskmanager_0>) because the
framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                  
  - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>.
Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor
container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                  
  - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container
requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                  
  - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending
container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                  
  - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                  
  - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
  - CHAIN Map (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key
Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317'
has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable   
  - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308,
heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647},
allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService     
  - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor         
  - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor         
  - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService     
  - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor         
  - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor         
  - Connecting to ResourceManager akka.tcp://flink@ip-10-30-52-224.ec2.internal:35979/user/resourcemanager(0000000000000000000000000
<akka.tcp://flink@ip-10-30-52-224.ec2.internal:35979/user/resourcemanager(00000000000000000000000000000000)>0000000)
<akka.tcp://flink@ip-10-30-52-224.ec2.internal:35979/user/resourcemanager(00000000000000000000000000000000)>.
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner               
  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache           
  - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
 - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache           
  - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache               
  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache               
  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor         
  - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor         
  - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor         
  - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor         
  - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask               
  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133'
has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted
yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators
get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long
running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log
shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
  - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9)
switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or
so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl      
  - Updating with new AMRMToken




Mime
View raw message