flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felipe Gutierrez <felipe.o.gutier...@gmail.com>
Subject Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
Date Sat, 13 Jun 2020 08:57:02 GMT
Hi Yun, it says an INFO "class org.joda.time.DateTime cannot be used
as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on
performance", however I cannot submit my job. It is strange because I
can start and run it on Intellij, but not on the standalone cluster in
my machine.

2020-06-13 10:50:56,051 INFO  org.apache.flink.core.fs.FileSystem
                     - Hadoop is not in the classpath/dependencies.
The extended set of supported File Systems via Hadoop is not
available.
2020-06-13 10:50:56,143 INFO
org.apache.flink.runtime.security.modules.HadoopModuleFactory  -
Cannot create Hadoop Security Module because Hadoop cannot be found in
the Classpath.
2020-06-13 10:50:56,164 INFO
org.apache.flink.runtime.security.modules.JaasModule          - Jaas
file will be created as /tmp/jaas-837993701496785981.conf.
2020-06-13 10:50:56,169 INFO
org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
 - Cannot install HadoopSecurityContext because Hadoop cannot be found
in the Classpath.
2020-06-13 10:50:56,169 WARN
org.apache.flink.runtime.security.SecurityUtils               - Unable
to install incompatible security context factory
org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
2020-06-13 10:50:56,171 INFO  org.apache.flink.client.cli.CliFrontend
                     - Running 'run' command.
2020-06-13 10:50:56,242 INFO  org.apache.flink.client.cli.CliFrontend
                     - Building program from JAR file
2020-06-13 10:50:57,084 INFO  org.apache.flink.client.ClientUtils
                     - Starting program (detached: false)
2020-06-13 10:50:59,021 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - class
org.joda.time.DateTime does not contain a getter for field iMillis
2020-06-13 10:50:59,021 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - class
org.joda.time.DateTime does not contain a setter for field iMillis
2020-06-13 10:50:59,021 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - Class
class org.joda.time.DateTime cannot be used as a POJO type because not
all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
2020-06-13 10:50:59,028 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - class
org.joda.time.DateTime does not contain a getter for field iMillis
2020-06-13 10:50:59,028 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - class
org.joda.time.DateTime does not contain a setter for field iMillis
2020-06-13 10:50:59,028 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - Class
class org.joda.time.DateTime cannot be used as a POJO type because not
all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
2020-06-13 10:53:19,508 WARN  org.apache.flink.util.ExecutorUtils
                     - ExecutorService did not terminate in time.
Shutting it down now.
2020-06-13 10:53:19,510 ERROR org.apache.flink.client.cli.CliFrontend
                     - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.lang.RuntimeException:
java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1764)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
at org.apache.flink.streaming.examples.aggregate.TaxiRideCountPreAggregate.main(TaxiRideCountPreAggregate.java:136)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1759)
... 17 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException:
Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[org.apache.flink.runtime.rest.handler.RestHandlerException: Could not
upload job files.
at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:169)
at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)
at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:80)
at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:167)
... 7 more
Caused by: java.io.IOException: Could not connect to BlobServer at
address localhost/192.168.56.1:35193
at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:167)
at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:76)
... 8 more
Caused by: java.net.ConnectException: Connection timed out (Connection
timed out)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
... 10 more
]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 4 more
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Sat, Jun 13, 2020 at 5:08 AM Yun Gao <yungao.gy@aliyun.com> wrote:
>
> Hi Felipe,
>
>    I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally
and it seems to be able to startup normally.
>
>    Could you also share your current executing code and the full stacktrace of the exception
?
>
> Best,
>  Yun
>
>  [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java
>
> ------------------Original Mail ------------------
> Sender:Felipe Gutierrez <felipe.o.gutierrez@gmail.com>
> Send Date:Fri Jun 12 23:11:28 2020
> Recipients:user <user@flink.apache.org>
> Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide
training example?
>>
>> Hi,
>>
>> I am using the flink training exercise TaxiRide [1] to execute a
>> stream count of events. On the cluster and on my local machine I am
>> receiving the message that joda.Time cannot be serialized "class
>> org.joda.time.LocalDateTime is not a valid POJO type". However it is
>> starting the job on the cluster, but not in my local machine. So I
>> searched in the internet and it is requested to register the jodaTime
>> class on the environment[2]. I did like this:
>>
>> env.getConfig().registerTypeWithKryoSerializer(DateTime.class,
>> AvroKryoSerializerUtils.JodaDateTimeSerializer.class);
>> env.getConfig().registerTypeWithKryoSerializer(LocalDate.class,
>> AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
>> env.getConfig().registerTypeWithKryoSerializer(LocalTime.class,
>> AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
>>
>> and I added the joda and avro dependency on the pom.xml:
>>
>> <dependency>
>> <groupId>joda-time</groupId>
>> <artifactId>joda-time</artifactId>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-avro</artifactId>
>> <version>${project.version}</version>
>> </dependency>
>>
>> I also tested using addDefaultKryoSerializer but I got the same error.
>> For some reason, it is still not working. Does anyone have some hint
>> of what could be happening?
>>
>> Thanks! Felipe
>> [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com

Mime
View raw message