flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino yang <yanghua1...@gmail.com>
Subject Re: Flink 1.5 batch job fails to start
Date Tue, 24 Jul 2018 15:32:28 GMT
Hi Alex,

Based on your log information, the potential reason is Hadoop version. To
troubleshoot the exception comes from different Hadoop version. I suggest
you match the both side of Hadoop version.

You can :

1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
official website provides the binary binding Hadoop 2.8.[1]
2. downgrade your fat jar's Hadoop client dependency's version to match
Flink Cluster's hadoop dependency's version.

[1]:
http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz

Thanks, vino.

2018-07-24 22:59 GMT+08:00 Alex Vinnik <alvinnik.g@gmail.com>:

> Hi Till,
>
> Thanks for responding. Below is entrypoint logs. One thing I noticed that
> "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could
> it be a reason for that error? If so how can i use same hadoop version 2.8
> on flink server side?  BTW job runs fine locally reading from the same s3a
> buckets when executed using createLocalEnvironment via java -jar my-fat.jar
> --input s3a://foo --output s3a://bar
>
> Regarding java version. The job is submitted via Flink UI, so it should
> not be a problem.
>
> Thanks a lot in advance.
>
> 2018-07-24T12:09:38.083+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  ------------------------------------------------------------
> --------------------
> 2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0,
> Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
> 2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   OS current user: flink
> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Current Hadoop/Kerberos user: flink
> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Maximum heap size: 1963 MiBytes
> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   JAVA_HOME: /docker-java-home/jre
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Hadoop version: 2.7.3
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   JVM Options:
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -Xms2048m
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -Xmx2048m
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.
> disableCertChecking
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -Dcom.amazonaws.sdk.disableCertChecking
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,
> address=5015
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.
> properties
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -Dlogback.configurationFile=file:/opt/flink/conf/logback-
> console.xml
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Program Arguments:
> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      --configDir
> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      /opt/flink/conf
> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      --executionMode
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      cluster
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      --host
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      cluster
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Classpath: /opt/flink/lib/flink-metrics-
> datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.
> jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/
> flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/
> flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.
> 7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  ------------------------------------------------------------
> --------------------
> 2018-07-24T12:09:38.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Registered UNIX signal handlers for [TERM, HUP, INT]
> 2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Starting StandaloneSessionClusterEntrypoint.
> 2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Install default filesystem.
> 2018-07-24T12:09:38.927+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Install security context.
> 2018-07-24T12:09:39.034+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Initializing cluster services.
> 2018-07-24T12:09:39.059+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Trying to start actor system at flink-jobmanager:6123
> 2018-07-24T12:09:40.335+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Actor system started at akka.tcp://flink@flink-jobmanager:6123
>
> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> Hi Alex,
>>
>> I'm not entirely sure what causes this problem because it is the first
>> time I see it.
>>
>> First question would be if the problem also arises if using a different
>> Hadoop version.
>>
>> Are you using the same Java versions on the client as well as on the
>> server?
>>
>> Could you provide us with the cluster entrypoint logs?
>>
>> Cheers,
>> Till
>>
>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <alvinnik.g@gmail.com> wrote:
>>
>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink
>>> 1.5 and getting a weird exception.
>>>
>>> Job reads json from s3a and writes parquet files to s3a with avro model.
>>> Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to
>>> S3AFileSystem class.
>>>
>>> Fails here
>>> https://github.com/apache/flink/blob/release-1.5.0/
>>> flink-runtime/src/main/java/org/apache/flink/runtime/
>>> operators/util/TaskConfig.java#L288
>>> with
>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>> failed: unread block data
>>>
>>> To be exact it fails right on that line.
>>> https://github.com/apache/flink/blob/release-1.5.0/
>>> flink-core/src/main/java/org/apache/flink/util/
>>> InstantiationUtil.java#L488
>>>
>>> Not sure how to resolve this problem. Looking for an advice. Let me know
>>> if more info is needed. Full stack is below. Thanks.
>>>
>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>> org.apache.flink.util.FlinkException: Failed to submit job
>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$
>>> handleRequest$3(JarRunHandler.java:141)
>>> at java.util.concurrent.CompletableFuture.uniExceptionally(
>>> CompletableFuture.java:870)
>>> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(
>>> CompletableFuture.java:852)
>>> at java.util.concurrent.CompletableFuture.postComplete(
>>> CompletableFuture.java:474)
>>> at java.util.concurrent.CompletableFuture.completeExceptionally(
>>> CompletableFuture.java:1977)
>>> at org.apache.flink.runtime.concurrent.FutureUtils$1.
>>> onComplete(FutureUtils.java:811)
>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.
>>> execute(Executors.java:83)
>>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.
>>> scala:44)
>>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(
>>> Promise.scala:252)
>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$
>>> pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$
>>> pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(
>>> BatchingExecutor.scala:55)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>> apply$mcV$sp(BatchingExecutor.scala:91)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>> apply(BatchingExecutor.scala:91)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>> apply(BatchingExecutor.scala:91)
>>> at scala.concurrent.BlockContext$.withBlockContext(
>>> BlockContext.scala:72)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(
>>> BatchingExecutor.scala:90)
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
>>> AbstractDispatcher.scala:415)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>> runTask(ForkJoinPool.java:1339)
>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>> ForkJoinPool.java:1979)
>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>> ForkJoinWorkerThread.java:107)
>>> Caused by: java.util.concurrent.CompletionException:
>>> org.apache.flink.util.FlinkException: Failed to submit job
>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>> at java.util.concurrent.CompletableFuture.encodeRelay(
>>> CompletableFuture.java:326)
>>> at java.util.concurrent.CompletableFuture.completeRelay(
>>> CompletableFuture.java:338)
>>> at java.util.concurrent.CompletableFuture.uniRelay(
>>> CompletableFuture.java:911)
>>> at java.util.concurrent.CompletableFuture$UniRelay.
>>> tryFire(CompletableFuture.java:899)
>>> ... 29 more
>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>> submitJob(Dispatcher.java:254)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>>> NativeMethodAccessorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>> DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
>>> AkkaRpcActor.java:247)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
>>> handleRpcMessage(AkkaRpcActor.java:162)
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
>>> handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(
>>> AkkaRpcActor.java:142)
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
>>> onReceive(FencedAkkaRpcActor.java:40)
>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
>>> UntypedActor.scala:165)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> ... 4 more
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>>> not set up JobManager
>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
>>> init>(JobManagerRunner.java:169)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher$
>>> DefaultJobManagerRunnerFactory.createJobManagerRunner(
>>> Dispatcher.java:885)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>> createJobManagerRunner(Dispatcher.java:287)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(
>>> Dispatcher.java:277)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>> persistAndRunJob(Dispatcher.java:262)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>> submitJob(Dispatcher.java:249)
>>> ... 21 more
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>> Cannot initialize task 'DataSink (org.apache.flink.api.java.
>>> hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the
>>> OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.
>>> HadoopOutputFormat@5ebe8168) failed: unread block data
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>> buildGraph(ExecutionGraphBuilder.java:220)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>> buildGraph(ExecutionGraphBuilder.java:100)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(
>>> JobMaster.java:1150)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.
>>> createAndRestoreExecutionGraph(JobMaster.java:1130)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(
>>> JobMaster.java:298)
>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
>>> init>(JobManagerRunner.java:151)
>>> ... 26 more
>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>> failed: unread block data
>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
>>> initializeOnMaster(OutputFormatVertex.java:63)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>> buildGraph(ExecutionGraphBuilder.java:216)
>>> ... 31 more
>>> Caused by: java.lang.IllegalStateException: unread block data
>>> at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(
>>> ObjectInputStream.java:2781)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>> at java.io.ObjectInputStream.defaultReadFields(
>>> ObjectInputStream.java:2285)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>>> at java.io.ObjectInputStream.readOrdinaryObject(
>>> ObjectInputStream.java:2067)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:488)
>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:475)
>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:463)
>>> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
>>> InstantiationUtil.java:424)
>>> at org.apache.flink.runtime.operators.util.TaskConfig.
>>> getStubWrapper(TaskConfig.java:288)
>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
>>> initializeOnMaster(OutputFormatVertex.java:60)
>>> ... 32 more
>>>
>>>

Mime
View raw message