flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Executing Flink server From IntelliJ
Date Wed, 19 Jul 2017 10:26:58 GMT
Hello,

this problem is described in 
https://issues.apache.org/jira/browse/FLINK-6689.

Basically, if you want to use the LocalFlinkMiniCluster you should use a 
TestStreamEnvironment instead.
The RemoteStreamEnvironment only works with a proper Flink cluster.

Regards,
Chesnay

On 14.07.2017 15:43, Boris Lublinsky wrote:
> Hi,
> I am trying to upgrade my project from Flink 1.2 to 1.3 and getting 
> problems while trying to run Flink server from my Intellij project. 
> The code
>
> // Execute on the local Flink server - to test queariable state def 
> executeServer() :Unit = {
>
>    // We use a mini cluster here for sake of simplicity, because I don't 
> want // to require a Flink installation to run this demo. Everything 
> should be // contained in this JAR. val port =6124 val parallelism =4 val config =new
Configuration()
>    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port)
>    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism)
>    // In a non MiniCluster setup queryable state is enabled by default. config.setBoolean(QueryableStateOptions.SERVER_ENABLE,
true)
>
>    // Create a local Flink server val flinkCluster =new LocalFlinkMiniCluster(config,
false)
>    try {
>      // Start server and create environment flinkCluster.start(true); val env = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
port, parallelism)
>      // Build Graph buildGraph(env)
>      env.execute()
>      val jobGraph = env.getStreamGraph.getJobGraph
>      // Submit to the server and wait for completion flinkCluster.submitJobAndWait(jobGraph,
false)
>    }catch {
>      case e:Exception => e.printStackTrace()
>    }
> }
> Worked on version 1.2, but on 1.3 I am getting
>
> 08:41:29,179 INFO 
>  org.apache.flink.runtime.minicluster.FlinkMiniCluster - Starting 
> FlinkMiniCluster.
> 08:41:29,431 INFO  akka.event.slf4j.Slf4jLogger                       
>          - Slf4jLogger started
> 08:41:29,498 INFO  Remoting                                - Starting 
> remoting
> 08:41:29,730 INFO  Remoting                                - Remoting 
> started; listening on addresses :[akka.tcp://flink@localhost:6124]
> 08:41:29,762 INFO  org.apache.flink.runtime.blob.BlobServer  - Created 
> BLOB server storage directory 
> /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/blobStore-4e626961-9155-47e9-b1b8-f835a8435cfc
> 08:41:29,765 INFO  org.apache.flink.runtime.blob.BlobServer  - Started 
> BLOB server at 0.0.0.0:54319 - max concurrent requests: 50 - max 
> backlog: 1000
> 08:41:29,775 INFO  org.apache.flink.runtime.metrics.MetricRegistry - 
> No metrics reporter configured, no metrics will be exposed/reported.
> 08:41:29,781 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist 
> - Started memory archivist akka://flink/user/archive
> 08:41:29,786 INFO  org.apache.flink.runtime.jobmanager.JobManager  - 
> Starting JobManager at akka.tcp://flink@localhost:6124/user/jobmanager.
> 08:41:29,787 INFO 
>  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
>  - Proposing leadership to contender 
> org.apache.flink.runtime.jobmanager.JobManager@59cd5ef5 @ 
> akka.tcp://flink@localhost:6124/user/jobmanager
> 08:41:29,796 INFO  akka.event.slf4j.Slf4jLogger                       
>          - Slf4jLogger started
> 08:41:29,804 INFO  Remoting                                - Starting 
> remoting
> 08:41:29,813 INFO  Remoting                                - Remoting 
> started; listening on addresses :[akka.tcp://flink@localhost:54320]
> 08:41:29,825 INFO  akka.event.slf4j.Slf4jLogger                       
>          - Slf4jLogger started
> 08:41:29,830 INFO  Remoting                                - Starting 
> remoting
> 08:41:29,836 INFO  Remoting                                - Remoting 
> started; listening on addresses :[akka.tcp://flink@localhost:54321]
> 08:41:29,846 INFO  org.apache.flink.runtime.jobmanager.JobManager  - 
> JobManager akka.tcp://flink@localhost:6124/user/jobmanager was granted 
> leadership with leader session ID 
> Some(61d3ed9b-1c24-4bbf-99ef-c2a891613473).
> 08:41:29,847 INFO 
>  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
>  - Received confirmation of leadership for leader 
> akka.tcp://flink@localhost:6124/user/jobmanager , 
> session=61d3ed9b-1c24-4bbf-99ef-c2a891613473
> 08:41:29,850 INFO 
>  org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - 
> Messages have a max timeout of 10000 ms
> 08:41:29,851 INFO 
>  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager 
>  - Received leader address but not running in leader ActorSystem. 
> Cancelling registration.
> 08:41:29,855 INFO 
>  org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary 
> file directory '/var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T': 
> total 464 GB, usable 353 GB (76.08% usable)
> 08:41:30,493 INFO 
>  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - 
> Allocated 363 MB for network buffer pool (number of memory segments: 
> 11634, bytes per segment: 32768).
> 08:41:30,506 INFO 
>  org.apache.flink.runtime.io.network.NetworkEnvironment  - Starting 
> the network environment and its components.
> 08:41:30,508 INFO 
>  org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting 
> managed memory to 1145 MB, memory will be allocated lazily.
> 08:41:30,512 INFO 
>  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager 
> uses directory 
> /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/flink-io-9ec461ff-086d-45cd-b69c-e9890217d8fc

> for spill files.
> 08:41:30,514 INFO  org.apache.flink.runtime.metrics.MetricRegistry - 
> No metrics reporter configured, no metrics will be exposed/reported.
> 08:41:30,561 INFO  org.apache.flink.runtime.filecache.FileCache  - 
> User file cache uses directory 
> /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/flink-dist-cache-4c4e9bcf-5a66-43e7-b2e9-244f310c3c4c
> 08:41:30,570 INFO  org.apache.flink.runtime.filecache.FileCache  - 
> User file cache uses directory 
> /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/flink-dist-cache-687f7d57-33d7-4df3-915f-481008043fef
> 08:41:30,575 INFO  org.apache.flink.runtime.taskmanager.TaskManager  - 
> Starting TaskManager actor at akka://flink/user/taskmanager#-1401663761.
> 08:41:30,576 INFO  org.apache.flink.runtime.taskmanager.TaskManager  - 
> TaskManager data connection information: 
> 15d5b91a66be806304e6fe15fde8c0fe @ localhost (dataPort=-1)
> 08:41:30,576 INFO  org.apache.flink.runtime.taskmanager.TaskManager  - 
> TaskManager has 4 task slot(s).
> 08:41:30,578 INFO  org.apache.flink.runtime.taskmanager.TaskManager  - 
> Memory usage stats: [HEAP: 391/838/3641 MB, NON HEAP: 25/26/-1 MB 
> (used/committed/max)]
> 08:41:30,582 INFO  org.apache.flink.runtime.taskmanager.TaskManager  - 
> Trying to register at JobManager 
> akka.tcp://flink@localhost:6124/user/jobmanager (attempt 1, timeout: 
> 500 milliseconds)
> 08:41:30,729 INFO  org.apache.flink.runtime.jobmanager.JobManager  - 
> Task Manager Registration but not connected to ResourceManager
> 08:41:30,732 INFO  org.apache.flink.runtime.instance.InstanceManager - 
> Registered TaskManager at localhost 
> (akka.tcp://flink@localhost:54321/user/taskmanager) as 
> 38047c3fc643910d58ecc414e8233f78. Current number of registered hosts 
> is 1. Current number of alive task slots is 4.
> 08:41:30,741 INFO  org.apache.flink.runtime.taskmanager.TaskManager  - 
> Successful registration at JobManager 
> (akka.tcp://flink@localhost:6124/user/jobmanager), starting network 
> stack and library cache.
> 08:41:30,743 INFO  org.apache.flink.runtime.taskmanager.TaskManager  - 
> Determined BLOB server address to be localhost/127.0.0.1:54319. 
> Starting BLOB cache.
> 08:41:30,745 INFO  org.apache.flink.runtime.blob.BlobCache - Created 
> BLOB cache storage directory 
> /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/blobStore-122d4c13-34d1-4c01-8a50-f6dfdae8b06b
> 08:41:30,996 INFO 
>  org.apache.flink.streaming.api.environment.RemoteStreamEnvironment  - 
> Running remotely at localhost:6124
> 08:41:31,085 INFO 
>  org.apache.flink.client.program.StandaloneClusterClient - Starting 
> client actor system.
> 08:41:31,087 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils 
>  - Trying to select the network interface and address to use by 
> connecting to the leading JobManager.
> 08:41:31,087 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils 
>  - TaskManager will try to connect for 10000 milliseconds before 
> falling back to heuristics
> 08:41:31,088 INFO  org.apache.flink.runtime.net.ConnectionUtils  - 
> Retrieved new target address localhost/127.0.0.1:6124.
> 08:41:31,100 INFO  akka.event.slf4j.Slf4jLogger                       
>          - Slf4jLogger started
> 08:41:31,103 INFO  Remoting                                - Starting 
> remoting
> 08:41:31,108 INFO  Remoting                                - Remoting 
> started; listening on addresses :[akka.tcp://flink@localhost:54324]
> 08:41:31,108 INFO 
>  org.apache.flink.client.program.StandaloneClusterClient - Submitting 
> job with JobID: 74abb7674b9522ad3a204a1315cf609e. Waiting for job 
> completion.
> Submitting job with JobID: 74abb7674b9522ad3a204a1315cf609e. Waiting 
> for job completion.
> 08:41:31,113 INFO 
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - 
> Disconnect from JobManager null.
> 08:41:31,116 INFO 
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received 
> SubmitJobAndWait(JobGraph(jobId: 74abb7674b9522ad3a204a1315cf609e)) 
> but there is no connection to a JobManager yet.
> 08:41:31,116 INFO 
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received 
> job Flink Streaming Job (74abb7674b9522ad3a204a1315cf609e).
> 08:41:31,125 INFO 
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connect 
> to JobManager 
> Actor[akka.tcp://flink@localhost:6124/user/jobmanager#-297192771].
> 08:41:31,126 INFO 
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connected 
> to JobManager at 
> Actor[akka.tcp://flink@localhost:6124/user/jobmanager#-297192771] with 
> leader session id 00000000-0000-0000-0000-000000000000.
> Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6124/user/jobmanager#-297192771] with 
> leader session id 00000000-0000-0000-0000-000000000000.
> 08:41:31,126 INFO 
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Sending 
> message to JobManager akka.tcp://flink@localhost:6124/user/jobmanager 
> to submit job Flink Streaming Job (74abb7674b9522ad3a204a1315cf609e) 
> and wait for progress
> 08:41:31,128 INFO 
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Upload 
> jar files to job manager akka.tcp://flink@localhost:6124/user/jobmanager.
> 08:41:31,129 INFO 
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Submit 
> job to the job manager akka.tcp://flink@localhost:6124/user/jobmanager.
> 08:41:31,146 WARN  org.apache.flink.runtime.jobmanager.JobManager  - 
> Discard message 
> LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 
> 74abb7674b9522ad3a204a1315cf609e),EXECUTION_RESULT_AND_STATE_CHANGES)) 
> because the expected leader session ID 
> 61d3ed9b-1c24-4bbf-99ef-c2a891613473 did not equal the received leader 
> session ID 00000000-0000-0000-0000-000000000000.
> 08:42:30,381 INFO 
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Terminate 
> JobClientActor.
> 08:42:30,382 INFO 
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - 
> Disconnect from JobManager 
> Actor[akka.tcp://flink@localhost:6124/user/jobmanager#-297192771].
> 08:42:30,391 INFO 
>  akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down 
> remote daemon.
> 08:42:30,392 INFO 
>  akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon 
> shut down; proceeding with flushing remote transports.
> 08:42:30,411 INFO 
>  akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut 
> down.
> org.apache.flink.client.program.ProgramInvocationException: The 
> program execution failed: Couldn't retrieve the JobExecutionResult 
> from the JobManager.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1499)
> at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
> at 
> com.lightbend.modelServer.ModelServingKeyedJob$.executeServer(ModelServingKeyedJob.scala:66)
> at 
> com.lightbend.modelServer.ModelServingKeyedJob$.main(ModelServingKeyedJob.scala:39)
> at 
> com.lightbend.modelServer.ModelServingKeyedJob.main(ModelServingKeyedJob.scala)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: 
> Couldn't retrieve the JobExecutionResult from the JobManager.
> at 
> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
> at 
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
> ... 10 more
> Caused by: 
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: 
> Job submission to the JobManager timed out. You may increase 
> 'akka.client.timeout' in case the JobManager needs more time to 
> configure and confirm the job submission.
> at 
> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
> at 
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
> at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
> at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 08:42:30,424 INFO  org.apache.flink.runtime.blob.BlobCache - Shutting 
> down BlobCache
> 08:42:30,433 INFO  org.apache.flink.runtime.blob.BlobServer  - Stopped 
> BLOB server at 0.0.0.0:54319
> 08:42:30,434 INFO 
>  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager 
> removed spill file directory 
> /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/flink-io-9ec461ff-086d-45cd-b69c-e9890217d8fc
>
> Process finished with exit code 0
>
> Any help will be appreciated
>
>
>
> Boris Lublinsky
> FDP Architect
> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
> https://www.lightbend.com/
>


Mime
View raw message