Hi Till,

Thanks for responding. Below is entrypoint logs. One thing I noticed that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could it be a reason for that error? If so how can i use same hadoop version 2.8 on flink server side?  BTW job runs fine locally reading from the same s3a buckets when executed using createLocalEnvironment via java -jar my-fat.jar --input s3a://foo --output s3a://bar

Regarding java version. The job is submitted via Flink UI, so it should not be a problem. 

Thanks a lot in advance.

2018-07-24T12:09:38.083+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current Hadoop/Kerberos user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap size: 1963 MiBytes
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME: /docker-java-home/jre
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.7.3
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dcom.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program Arguments:
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --configDir
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      /opt/flink/conf
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --executionMode
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath: /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting StandaloneSessionClusterEntrypoint.
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install default filesystem.
2018-07-24T12:09:38.927+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install security context.
2018-07-24T12:09:39.034+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing cluster services.
2018-07-24T12:09:39.059+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to start actor system at flink-jobmanager:6123
2018-07-24T12:09:40.335+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system started at akka.tcp://flink@flink-jobmanager:6123

On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <trohrmann@apache.org> wrote:
Hi Alex,

I'm not entirely sure what causes this problem because it is the first time I see it. 

First question would be if the problem also arises if using a different Hadoop version.

Are you using the same Java versions on the client as well as on the server?

Could you provide us with the cluster entrypoint logs?

Cheers,
Till

On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <alvinnik.g@gmail.com> wrote:
Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5 and getting a weird exception.

Job reads json from s3a and writes parquet files to s3a with avro model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to S3AFileSystem class.

Fails here
with
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data

To be exact it fails right on that line.

Not sure how to resolve this problem. Looking for an advice. Let me know if more info is needed. Full stack is below. Thanks.

org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 29 more
Caused by: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more