Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0B16E191F3 for ; Fri, 15 Apr 2016 08:07:37 +0000 (UTC) Received: (qmail 28380 invoked by uid 500); 15 Apr 2016 08:07:36 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 28315 invoked by uid 500); 15 Apr 2016 08:07:36 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 28304 invoked by uid 99); 15 Apr 2016 08:07:36 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Apr 2016 08:07:36 +0000 Received: from mail-wm0-f51.google.com (mail-wm0-f51.google.com [74.125.82.51]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 47E481A0046 for ; Fri, 15 Apr 2016 08:07:36 +0000 (UTC) Received: by mail-wm0-f51.google.com with SMTP id n3so20754351wmn.0 for ; Fri, 15 Apr 2016 01:07:36 -0700 (PDT) X-Gm-Message-State: AOPr4FWhCWHRHYpxDc7V5T+qMKrZdoVOAaY4avDO2iwK3Uyro1Vv4OxVg2HKNepjvni6NCpcRgn+kF/FsJ65tA== MIME-Version: 1.0 X-Received: by 10.194.178.233 with SMTP id db9mr20435121wjc.11.1460707654762; Fri, 15 Apr 2016 01:07:34 -0700 (PDT) Received: by 10.194.234.67 with HTTP; Fri, 15 Apr 2016 01:07:34 -0700 (PDT) In-Reply-To: References: Date: Fri, 15 Apr 2016 10:07:34 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Flink Interpreter w/ yarn-session From: Till Rohrmann To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=089e013d1a08e20df005308180e5 --089e013d1a08e20df005308180e5 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable In HA mode, the host and port information you provide to the Shell should be simply ignored. So you don't have to retrieve them from the .yarn-properties file. Could you maybe run the FlinkInterpreter with debug log level and share the logs with me? You can also do that privately, if you don't want to share them on the mailing list. I haven't tried it myself, but I thought that the Shell also works with an HA cluster, because it uses the same mechanism as the CLI, for example. I'll try it out later this day. Cheers, Till On Fri, Apr 15, 2016 at 12:22 AM, Andrea Sella wrote: > Hi Till, > > The cluster has started in HA. > I already patched Flink interpreter to allow passing the Configuration to > FlinkILoop. Neverthless I have to pass host and port to FlinkILoop, there > are required from FlinkILoop constructor and I retrieve them from > .yarn-properties file. > > I logged Flink Configuration: > > INFO [2016-04-14 17:52:58,141] ({pool-2-thread-2} > FlinkInterpreter.java[open]:96) - Flink Configuration: { > recovery.mode=3Dzookeeper, host=3Dyarn, > yarn-properties=3D/tmp/.yarn-properties-flink, > recovery.zookeeper.quorum=3Dslave01:2181,slave02:2181,master:2181, > recovery.zookeeper.path.root=3D/flink/recovery} > > and I attach some logs: > > Error displayed in paragraph of Zeppelin > > JobManager log > > Interpreter/FlinkILoop log > > > I was looking Flink shell and it works similar to the interpreter, do it > works with HA cluster? > > Thank you, > Andrea > > > 2016-04-14 16:09 GMT+02:00 Till Rohrmann : > > > Hi Andrea, > > > > have you started the Flink Yarn cluster in HA mode? Then the job manage= r > > address is stored in ZooKeeper and you have to tell your FlinkILoop tha= t > it > > should retrieve the JobManager address from there. In order to do that > you > > have to set conf.setString(ConfigConstants.RECOVERY_MODE, > > "zookeeper"), conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, > > "address of your zookeeper cluster") and > > conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY, > > "flink dir you've set") where conf is the flink configuration object. T= he > > values for the different configuration values must match the values > > specified in the flink-conf.yaml file. You then give the FlinkILoop the > > conf object. > > > > I=E2=80=99m not sure whether you can specify a custom flink configurati= on in > > Zeppelin. I think you can only specify a host and port. So either you > start > > you Flink cluster in non-HA mode or you have to patch Zeppelin. > > > > Cheers, > > Till > > =E2=80=8B > > > > On Tue, Apr 12, 2016 at 5:12 PM, Andrea Sella < > andrea.sella@radicalbit.io> > > wrote: > > > > > Hi, > > > > > > I am working to allow Zeppelin's flink interpreter to connect an > existing > > > yarn cluster. Yarn cluster has started via yarn-session and flink's > > version > > > is 1.0.0. > > > > > > My approach is to read host and port from .yarn-properties and pass > them > > to > > > IFlinkLoop. > > > Now I am facing an issue with Session ID when I submit a paragraph to > > yarn > > > cluster. > > > The yarn cluster throws a warning similar to: > > > > > > 2016-04-12 10:14:32,666 WARN org.apache.flink.yarn.YarnJobManager > > > - Discard message > > > LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: > > > 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES)= ) > > > because the expected leader session ID > > > Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not equal the received > > > leader session ID None. > > > > > > My Zeppelin's paragraph throws a > > JobClientActorSubmissionTimeoutException, > > > maybe is it due to the missing sessionId? Do I need to pass extra > params > > to > > > connect correctly to the yarn cluster or host and port are enough? > > > > > > Thanks in advance, > > > Andrea > > > > > > --089e013d1a08e20df005308180e5--