Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1CA53200D0A for ; Wed, 20 Sep 2017 05:46:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1ADBD1609E0; Wed, 20 Sep 2017 03:46:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 68F7F1609DD for ; Wed, 20 Sep 2017 05:46:51 +0200 (CEST) Received: (qmail 81263 invoked by uid 500); 20 Sep 2017 03:46:45 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 81242 invoked by uid 99); 20 Sep 2017 03:46:45 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Sep 2017 03:46:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 8F6691A6079; Wed, 20 Sep 2017 03:46:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.401 X-Spam-Level: X-Spam-Status: No, score=-0.401 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.8, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id K0ZT3u3aHETU; Wed, 20 Sep 2017 03:46:39 +0000 (UTC) Received: from mail-pf0-f194.google.com (mail-pf0-f194.google.com [209.85.192.194]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id C9BE75FBE5; Wed, 20 Sep 2017 03:46:38 +0000 (UTC) Received: by mail-pf0-f194.google.com with SMTP id h4so700715pfk.0; Tue, 19 Sep 2017 20:46:38 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=XkcouBqsMA4MZ8mD5t6uYIn3f52lV4GPS8/CaUDF6VU=; b=a7YCCk9TMCSh84cki+aR9x8UISI7XOlNmFYD5+h3108mnSZtDFoz6eJjgMBMevYk4s TH7IqUgutD3/IF2eC9Vw7WKs5qmk6kiMvRZlUqr59Z7nBkeZExmiuIqVb5Jz1q6AcAUB y509VvOw4DEdtZ0IqzDf6va63xuqql9cpqASAyvqsh1Q2haqFVs7k2uTJE23vmg3A0uL BfsFQuI1huNTvlxgcE4LNE/BpqTGVV8S8XPp9neujEJoUxUsB95asEMHW+sqyXvoPTaK KNxbdScad2/l0K94SuRguBD0rJ18EIHowOy/xegwn5onookoC2ZpEY5TuqsTrGpej2Va 8UcA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=XkcouBqsMA4MZ8mD5t6uYIn3f52lV4GPS8/CaUDF6VU=; b=eOwB8wDp6W6vaqrEeVzf8IHDQ9Jt05AzpDov7b3v01GKd3ZBneBtPHHt/bA48+E8bD 53yWg0tpdW1oOcKpY0wnyVe1nPNZw4cCGkJzEtv9aOHgOY1RFp2v8gOIsVhlcLgf6xhR q+T9yfiqZZla/sW4dNeQdipjRGJzeimSxJRJLDW1LTkxqUEWGSJBKVDz4rCjQ6wUUouM TzGpDUZWB6/R+KV1mjlxIa6w/xp9BQKPi5LVWxYRbbrMFG7PoifGtXtjOARVYo0m4frW Lwb5EuwfF4F7ZM9Pdn+Ux3vxjeET7gf3WS3mVoUxzEDHz6oR0/c7mSErvegSxR1h9eMt Z32Q== X-Gm-Message-State: AHPjjUjKtys+eEg3WysbgeOQ6CmFOl6+IoXzLUMW3qZZai5L7Oc4BTfV eHN+HkqTNplLShX2EaY5/uM= X-Google-Smtp-Source: AOwi7QDL8kCFWn5QaQqY0aWygnSwooTrMdNEH+Crck22i+b55bprlso6lCL7j2utFgVqw8r6PHWB2Q== X-Received: by 10.99.99.130 with SMTP id x124mr751532pgb.385.1505879197589; Tue, 19 Sep 2017 20:46:37 -0700 (PDT) Received: from [10.242.120.60] ([43.230.90.171]) by smtp.gmail.com with ESMTPSA id q28sm5573842pfj.77.2017.09.19.20.46.35 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Tue, 19 Sep 2017 20:46:37 -0700 (PDT) From: XiangWei Huang Message-Id: <9577ABE7-ABC8-4681-9C62-9599C5B5AA59@gmail.com> Content-Type: multipart/alternative; boundary="Apple-Mail=_3581C153-1E2A-46B0-9A7F-CA6A94AE1D51" Mime-Version: 1.0 (Mac OS X Mail 10.3 \(3273\)) Subject: Re: got Warn message - "the expected leader session ID did not equal the received leader session ID " when using LocalFlinkMiniCluster to interpret scala code Date: Wed, 20 Sep 2017 11:46:33 +0800 In-Reply-To: Cc: dev@flink.apache.org, user To: Till Rohrmann References: <1FA0C015-70C5-4E51-863A-19933C7CF165@gmail.com> X-Mailer: Apple Mail (2.3273) archived-at: Wed, 20 Sep 2017 03:46:53 -0000 --Apple-Mail=_3581C153-1E2A-46B0-9A7F-CA6A94AE1D51 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=gb2312 Hi Till, =20 Thanks for your answer,it worked when i use = StandaloneMiniCluster,but another problem is that i can=A1=AFt find a = way to cancel a running Flink job without shutting down the cluster,for = LocalFlinkMiniCluster i can do it with below code : for (job <- cluster.getCurrentlyRunningJobsJava()) { cluster.stopJob(job) } Is it possible to cancel a running Flink job without shutting down a = StandaloneMiniCluster ? Best Regards, XiangWei > =D4=DA 2017=C4=EA9=D4=C214=C8=D5=A3=AC16:58=A3=ACTill Rohrmann = =D0=B4=B5=C0=A3=BA >=20 > Hi XiangWei, >=20 > the problem is that the LocalFlinkMiniCluster can no longer be used in = combination with a RemoteExecutionEnvironment. The reason is that the = LocalFlinkMiniCluster uses now an internal leader election service and = assigns leader ids to its components. Since this is an internal service = it is not possible to retrieve this information like it is the case with = the ZooKeeper based leader election services. >=20 > Long story short, the Flink Scala shell currently does not work with a = LocalFlinkMiniCluster and would have to be fixed to work properly = together with a local execution environment. Until then, I recommend = starting a local standalone cluster and let the code run there. >=20 > Cheers, > Till >=20 >=20 > On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang > wrote: > dear all, >=20 > Below is the code i execute: >=20 > import java.io ._ > import java.net .{URL, URLClassLoader} > import java.nio.charset.Charset > import java.util.Collections > import java.util.concurrent.atomic.AtomicBoolean >=20 > import com.netease.atom.common.util.logging.Logging > import com.netease.atom.interpreter.Code.Code > import com.netease.atom.interpreter.{Code, Interpreter, = InterpreterResult, InterpreterUtils} > import io.netty.buffer._ > import org.apache.flink.api.scala.FlinkILoop > import org.apache.flink.client.CliFrontend > import org.apache.flink.client.cli.CliFrontendParser > import org.apache.flink.client.program.ClusterClient > import org.apache.flink.configuration.{QueryableStateOptions, = Configuration, ConfigConstants, GlobalConfiguration} > import org.apache.flink.runtime.akka.AkkaUtils > import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, = LocalFlinkMiniCluster} >=20 > import scala.Console > import scala.beans.BeanProperty > import scala.collection.JavaConversions._ > import scala.collection.mutable > import scala.collection.mutable.{ArrayBuffer, ListBuffer} > import scala.runtime.AbstractFunction0 > import scala.tools.nsc.Settings > import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results} >=20 > class FlinkInterpreter extends Interpreter { > private var bufferedReader: Option[BufferedReader] =3D None > private var jprintWriter: JPrintWriter =3D _ > private val config =3D new Configuration; > private var cluster: LocalFlinkMiniCluster =3D _ > @BeanProperty var imain: IMain =3D _ > @BeanProperty var flinkILoop: FlinkILoop =3D _ > private var out: ByteBufOutputStream =3D null > private var outBuf: ByteBuf =3D null > private var in: ByteBufInputStream =3D _ > private var isRunning: AtomicBoolean =3D new AtomicBoolean(false) >=20 > override def isOpen: Boolean =3D { > isRunning.get() > } >=20 > def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) =3D = { > config.toMap.toMap.foreach(println) > config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1) > config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) > config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, = 1) > config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) > config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true) > config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) > val localCluster =3D new LocalFlinkMiniCluster(config, false) > localCluster.start(true) > val port =3D = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port > println(s"Starting local Flink cluster (host: localhost,port: = ${localCluster.getLeaderRPCPort}).\n") > ("localhost", localCluster.getLeaderRPCPort, localCluster) > } >=20 > =20 > /** > * Start flink cluster and create interpreter > */ > override def open: Unit =3D { > outBuf =3D ByteBufAllocator.DEFAULT.heapBuffer(20480) > out =3D new ByteBufOutputStream(outBuf) > in =3D new ByteBufInputStream(outBuf) > // val (host, port, yarnCluster) =3D = deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), = None)) > val (host, port, localCluster) =3D startLocalMiniCluster() > this.cluster =3D localCluster > val conf =3D cluster.configuration > println(s"Connecting to Flink cluster (host:$host,port:$port)...") > flinkILoop =3D new FlinkILoop(host, port, conf, None) > val settings =3D new Settings() > settings.usejavacp.value =3D true > settings.Yreplsync.value =3D true > flinkILoop.settings_$eq(settings) > flinkILoop.createInterpreter() > imain =3D flinkILoop.intp > FlinkInterpreter.ourClassloader =3D imain.classLoader > val benv =3D flinkILoop.scalaBenv > val senv =3D flinkILoop.scalaSenv > benv.getConfig.disableSysoutLogging() > senv.getConfig.disableSysoutLogging() > // import libraries > imain.interpret("import scala.tools.nsc.io = ._") > // imain.interpret("import Properties.userHome") > imain.interpret("import scala.compat.Platform.EOL") > imain.interpret("import org.apache.flink.api.scala._") > imain.interpret("import org.apache.flink.api.common.functions._") > isRunning.set(true) > } >=20 > override def interpret(line: String): InterpreterResult =3D { > if (line =3D=3D null || line.trim.length =3D=3D 0) { > return new InterpreterResult(Code.SUCCESS) > } > interpret(line.split("\n")) > } >=20 > /** > * Interprete code > * @param lines > * @return > */ > def interpret(lines: Array[String]): InterpreterResult =3D { > val imain: IMain =3D getImain > val linesToRun: Array[String] =3D new Array[String](lines.length + = 1) > for (i <- 0 until lines.length) { > linesToRun(i) =3D lines(i) > } > linesToRun(lines.length) =3D "print(\"\")" > System.setOut(new PrintStream(out)) > out.buffer().clear() > var r: Code =3D null > var incomplete: String =3D "" > var inComment: Boolean =3D false > for (l <- 0 until linesToRun.length) { > val s: String =3D linesToRun(l) > var continuation: Boolean =3D false > if (l + 1 < linesToRun.length) { > val nextLine: String =3D linesToRun(l + 1).trim > if (nextLine.isEmpty || > nextLine.startsWith("//") || > nextLine.startsWith("}") || > nextLine.startsWith("object")) { > continuation =3D true > } else if (!inComment && nextLine.startsWith("/*")) { > inComment =3D true > continuation =3D true > } else if (!inComment && nextLine.lastIndexOf("*/") >=3D 0) { > inComment =3D false > continuation =3D true > } else if (nextLine.length > 1 && > nextLine.charAt(0) =3D=3D '.' && > nextLine.charAt(1) !=3D '.' && > nextLine.charAt(1) !=3D '/') { > continuation =3D true > } else if (inComment) { > continuation =3D true > } > if (continuation) { > incomplete +=3D s + "\n" > } > } > if (!continuation) { > val currentCommand: String =3D incomplete > var res: Results.Result =3D null > try { > res =3D Console.withOut(System.out)(new = AbstractFunction0[Results.Result] { > override def apply() =3D { > imain.interpret(currentCommand + s) > } > }.apply()) > } catch { > case e: Exception =3D> > logError("Interpreter Exception ", e) > return new InterpreterResult(Code.ERROR, = InterpreterUtils.getMostRelevantMessage(e)) > } > r =3D getResultCode(res) > if (r =3D=3D Code.ERROR) { > return new InterpreterResult(r, out.toString) > } else if (r eq Code.INCOMPLETE) { > incomplete +=3D s + "\n" > } else { > incomplete =3D "" > } > } > } >=20 > if (r eq Code.INCOMPLETE) { > return new InterpreterResult(r, "Incomplete expression") > } > else { > return new InterpreterResult(r, = out.buffer().toString(Charset.forName("utf-8"))) > } > } >=20 > private def getResultCode(r: Results.Result): Code =3D { > if (r.isInstanceOf[Results.Success.type]) { > return Code.SUCCESS > } > else if (r.isInstanceOf[Results.Incomplete.type]) { > return Code.INCOMPLETE > } > else { > return Code.ERROR > } > } >=20 > } > } >=20 > object FlinkInterpreter extends Logging { > var ourClassloader: ClassLoader =3D _ >=20 > def main(args: Array[String]): Unit =3D { > val interpreter: FlinkInterpreter =3D new FlinkInterpreter > val code =3D > """ > |val dataStream =3D senv.fromElements(1,2,3,4,5) > |dataStream.countWindowAll(2).sum(0).print() > |senv.execute("My streaming program") > """.stripMargin > interpreter.open > val result =3D interpreter.interpret(code) > } > } >=20 > The error messages i got are: > =A1=AD > =A1=AD > ... > [WARN] [17/09/13 12:04:52] = [org.apache.flink.runtime.jobmanager.JobManager] Discard message = LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGra= ph(jobId: = 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) = because the expected leader session ID = 678ef53b-ff25-4298-b566-9c2d9e7371c7 did not equal the received leader = session ID 00000000-0000-0000-0000-000000000000. > [INFO] [17/09/13 12:05:52] = [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate = JobClientActor. > [INFO] [17/09/13 12:05:52] = [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect = from JobManager = Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940 <>]. > [INFO] [17/09/13 12:05:52] = [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down = remote daemon. > [INFO] [17/09/13 12:05:52] = [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon = shut down; proceeding with flushing remote transports. > [INFO] [17/09/13 12:05:52] = [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut = down. > org.apache.flink.client.program.ProgramInvocationException: The = program execution failed: Couldn't retrieve the JobExecutionResult from = the JobManager. > at = org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478) > at = org.apache.flink.client.program.StandaloneClusterClient.submitJob(Standalo= neClusterClient.java:105) > at = org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442) > at = org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434) > at = org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute= Remotely(RemoteStreamEnvironment.java:212) > at = org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotel= y(ScalaShellRemoteStreamEnvironment.java:87) > at = org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute= (RemoteStreamEnvironment.java:176) > at = org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(St= reamExecutionEnvironment.scala:638) > ... 34 elided > Caused by: org.apache.flink.runtime.client.JobExecutionException: = Couldn't retrieve the JobExecutionResult from the JobManager. > at = org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:30= 9) > at = org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:= 396) > at = org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467) > ... 41 more > Caused by: = org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: = Job submission to the JobManager timed out. You may increase = 'akka.client.timeout' in case the JobManager needs more time to = configure and confirm the job submission. > at = org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessa= ge(JobSubmissionClientActor.java:119) > at = org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActo= r.java:251) > at = org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(Flin= kUntypedActor.java:89) > at = org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActo= r.java:68) > at = akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:= 167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at = akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractD= ispatcher.scala:397) > at = scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at = scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java= :1339) > at = scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at = scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.ja= va:107) >=20 >=20 >=20 >=20 >=20 --Apple-Mail=_3581C153-1E2A-46B0-9A7F-CA6A94AE1D51 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8
Hi Till,
  =  
     Thanks for your = answer,it worked when i use StandaloneMiniCluster,but another problem is that i = can=E2=80=99t find a way to cancel
a running Flink = job without shutting down the cluster,for LocalFlinkMiniCluster i can do  it with = below code :

   for (job <- = cluster.getCurrentlyRunningJobsJava()) {
      cluster.stopJob(job)
   }


   Is it possible to cancel a running Flink job = without shutting down a 
StandaloneMiniCluster = ?

Best Regards,
XiangWei



=E5=9C=A8 = 2017=E5=B9=B49=E6=9C=8814=E6=97=A5=EF=BC=8C16:58=EF=BC=8CTill Rohrmann = <trohrmann@apache.org> =E5=86=99=E9=81=93=EF=BC=9A
<= br class=3D"Apple-interchange-newline">

Hi XiangWei,

the problem is that = the LocalFlinkMiniCluster can no longer be used = in combination with a RemoteExecutionEnvironment. The reason is = that the LocalFlinkMiniCluster uses now an internal = leader election service and assigns leader ids to its components. Since = this is an internal service it is not possible to retrieve this = information like it is the case with the ZooKeeper based leader election = services.

Long = story short, the Flink Scala shell currently does not work with a LocalFlinkMiniCluster and would have to be = fixed to work properly together with a local execution environment. = Until then, I recommend starting a local standalone cluster and let the = code run there.

Cheers,
Till

=E2=80=8B

On Wed, = Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw.huang.hz@gmail.com> wrote:
dear all,

Below is the code i execute:

import java.io._
import java.net.{URL, URLClassLoader}
import java.nio.charset.Charset
import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean

import com.netease.atom.common.util.logging.Logging
import com.netease.atom.interpreter.Code.Code
import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, InterpreterUtils}
import io.netty.buffer._
import org.apache.flink.api.scala.FlinkILoop
import org.apache.flink.client.CliFrontend
import org.apache.flink.client.cli.CliFrontendParser
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{QueryableStateOptions, Configuration, ConfigConstants, GlobalConfiguration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, LocalFlinkMiniCluster}

import scala.Console
import scala.beans.BeanProperty
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.runtime.AbstractFunction0
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}

class FlinkInterpreter extends Interpreter {
  private = var bufferedReader: Option[BufferedReader] =3D None
  private var jprintWriter: JPrintWriter =3D _
  private = val config =3D new Configuration;
  private var cluster: LocalFlinkMiniCluster =3D= _
  @BeanProperty var imain: IMain =3D = _
  @BeanProperty var flinkILoop: FlinkILoop =3D _
  private = var out: ByteBufOutputStream =3D null
  = private var outBuf: ByteBuf =3D null
  = private var in: ByteBufInputStream =3D _
  private var isRunning: AtomicBoolean = =3D new AtomicBoolean(false)

  override def isOpen: Boolean =3D = {
    isRunning.get()
  = }

  def startLocalMiniCluster(): (String, = Int, LocalFlinkMiniCluster) =3D {
  =   config.toMap.toMap.foreach(println)
    config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
  =   config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
  =   config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
  =   config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
  =   config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
  =   config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
  =   val localCluster = =3D new LocalFlinkMiniCluster(config, false)
    = localCluster.start(true)
    val port = =3D AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
    println(s"Starting local Flink cluster = (host: localhost,port: ${localCluster.getLeaderRPCPort}).\n")
    ("localhost", localCluster.getLeaderRPCPort, localCluster)
  }

 
  /**
   * Start flink = cluster and create interpreter
   */
  override def open: Unit =3D {
    outBuf =3D = ByteBufAllocator.DEFAULT.heapBuffer(20480)
    out =3D new ByteBufOutputStream(outBuf)
  =   in =3D new ByteBufInputStream(outBuf)
    //    val = (host, port, yarnCluster) =3D deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
    val (host, port, localCluster) =3D startLocalMiniCluster()
  =   this.cluster =3D localCluster
  =   val conf =3D cluster.configuration
    println(s"Connecting to Flink cluster = (host:$host,port:$port)...")
  =   flinkILoop =3D new FlinkILoop(host, port, conf, None)
  =   val settings =3D new Settings()
    settings.usejavacp.value = =3D true
    settings.Yreplsync.value = =3D true
  =   flinkILoop.settings_$eq(settings)
    flinkILoop.createInterpreter()
  =   imain =3D flinkILoop.intp
  =   FlinkInterpreter.ourClassloader =3D imain.classLoader
    val benv = =3D flinkILoop.scalaBenv
  =   val senv =3D flinkILoop.scalaSenv
    benv.getConfig.disableSysoutLogging()
    = senv.getConfig.disableSysoutLogging()
 =   // import libraries
  =   imain.interpret("import scala.tools.nsc.io._")
    //    imain.interpret("import = Properties.userHome")
  =   imain.interpret("import scala.compat.Platform.EOL")
    imain.interpret("import = org.apache.flink.api.scala._")
  =   imain.interpret("import org.apache.flink.api.common.functions._")
  =   isRunning.set(true)
  }

  override def interpret(line: String): = InterpreterResult =3D {
    if (line = =3D=3D null || line.trim.length =3D=3D 0) {
      return = new InterpreterResult(Code.SUCCESS)
    }
    = interpret(line.split("\n"))
  }

  /**
   * Interprete = code
   * @param lines
  =  * @return
   */
  def interpret(lines: Array[String]): = InterpreterResult =3D {
    val imain: = IMain =3D getImain
    val linesToRun: = Array[String] =3D new Array[String](lines.length= + 1)
    for (i = <- 0 until lines.length) {
    =   linesToRun(i) =3D lines(i)
    }
    linesToRun(lines.length) = =3D "print(\"\")"
  =   System.setOut(new PrintStream(out))
    out.buffer().clear()
  =   var r: Code =3D null
  =   var incomplete: String =3D ""
  =   var inComment: Boolean =3D false
    for (l = <- 0 until linesToRun.length) {
    =   val s: String =3D linesToRun(l)
    =   var continuation: Boolean =3D false
      if (l = + 1 < linesToRun.length) {
    =     val nextLine: String =3D linesToRun(l = + 1).trim
      =   if (nextLine.isEmpty ||
    =         nextLine.startsWith("//") ||
            = nextLine.startsWith("}") ||
        =     nextLine.startsWith("object")) {
  =         continuation =3D true
        } else = if (!inComment && nextLine.startsWith("/*")) {
          inComment =3D true
          continuation = =3D true
        } else = if (!inComment && nextLine.lastIndexOf("*/") >=3D 0) = {
          inComment = =3D false
        =   continuation =3D true
    =     } else if (nextLine.length = > 1 &&
        =     nextLine.charAt(0) =3D=3D '.' &&
            nextLine.charAt(1) = !=3D '.' &&
        =     nextLine.charAt(1) !=3D '/') {
  =         continuation =3D true
        } else = if (inComment) {
          = continuation =3D true
      =   }
      =   if (continuation) {
      =     incomplete +=3D s + "\n"
    =     }
      }
      if (!continuation) {
      =   val currentCommand: String =3D incomplete
        var res: Results.Result = =3D null
        try {
          res =3D = Console.withOut(System.out)(new AbstractFunction0[Results.Result] = {
            override = def apply() =3D {
          =     imain.interpret(currentCommand + s)
  =           }
      =     }.apply())
        = } catch {
        =   case e: Exception =3D>
  =           logError("Interpreter Exception = ", e)
          =   return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e))
      =   }
        r =3D = getResultCode(res)
      =   if (r =3D=3D Code.ERROR) {
    =       return = new InterpreterResult(r, out.toString)
        } else if (r eq = Code.INCOMPLETE) {
          = incomplete +=3D s + "\n"
      =   } else {
        =   incomplete =3D ""
      =   }
      }
  =   }

    if (r eq = Code.INCOMPLETE) {
      return = new InterpreterResult(r, "Incomplete = expression")
    }
  =   else {
      return = new InterpreterResult(r, out.buffer().toString(Charset.forName("utf-8")))    }
  }

  private def getResultCode(r: = Results.Result): Code =3D {
  =   if (r.isInstanceOf[Results.Success.type])= {
      return Code.SUCCESS
    }
    else = if (r.isInstanceOf[Results.Incomplete.type]) {
      return Code.INCOMPLETE
    }
  =   else {
    =   return Code.ERROR
    }
  }

  }
}

object FlinkInterpreter extends Logging {
  var ourClassloader: ClassLoader =3D _

  def main(args: = Array[String]): Unit =3D {
  =   val interpreter: FlinkInterpreter = =3D new FlinkInterpreter
  =   val code =3D
    =   """
        |val = dataStream =3D senv.fromElements(1,2,3,4,5)
    =     |dataStream.countWindowAll(2).sum(0).print()
        = |senv.execute("My streaming program")
      = """.stripMargin
    interpreter.open
    val result =3D = interpreter.interpret(code)
  }
}

The error messages i got are:
=E2=80=A6
=E2=80=A6
...
[WARN] [17/09/13 = 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager]= Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) = because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 did not equal the received leader session ID = 00000000-0000-0000-0000-000000000000.
[INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate = JobClientActor.
[INFO] [17/09/13 12:05:52] = [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940].
[INFO] = [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] = Shutting down remote daemon.
[INFO] [17/09/13 = 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut down; proceeding with = flushing remote transports.
[INFO] [17/09/13 = 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: = Couldn't retrieve the JobExecutionResult from the JobManager.
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
  at = org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
  at = org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
  at = org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
  at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
  at = org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
  at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
  ... 34 = elided
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the = JobExecutionResult from the JobManager.
  at = org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
  at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
  ... 41 = more
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You = may increase 'akka.client.timeout' in case the JobManager needs more = time to configure and confirm the job submission.
  at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
  at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
  at = org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
  at = org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
  at = akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
  at = akka.actor.Actor$class.aroundReceive(Actor.scala:467)
  at = akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
  at = akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at = akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
  at akka.dispatch.Mailbox.run(Mailbox.scala:220)
  at = akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
  at = scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at = scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at = scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)






= --Apple-Mail=_3581C153-1E2A-46B0-9A7F-CA6A94AE1D51--