flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrea Sella <andrea.se...@radicalbit.io>
Subject Re: Flink Interpreter w/ yarn-session
Date Thu, 21 Apr 2016 10:18:59 GMT
Hi Till,

It works as expected, thanks!

Andrea

2016-04-19 15:25 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:

> Hi Andrea,
>
> I think your problem should be fixed with the PRs [1,2]. I've tested it
> locally on my yarn cluster and it worked.
>
> [1] https://github.com/apache/flink/pull/1904
> [2] https://github.com/apache/flink/pull/1914
>
> Cheers,
> Till
>
> On Tue, Apr 19, 2016 at 2:16 PM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> I think this is another issue you’ve detected. I already spotted some
>> suspicious code in the yarn deployment section. If I’m not mistaken, then
>> flink-conf.yaml is read too late and is, thus, not respected. I’ll
>> verify it and if valid, then I’ll open another issue and fix it.
>>
>> Thanks for your patience and thorough reporting. It helps a lot :-)
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Apr 19, 2016 at 2:12 PM, Andrea Sella <andrea.sella@radicalbit.io
>> > wrote:
>>
>>> No, I tried it via scala-shell as you can see the attachment.
>>>
>>> Regards,
>>> Andrea
>>>
>>> 2016-04-19 14:08 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
>>>
>>>> Hi Andrea,
>>>>
>>>> thanks for testing it. How did you submit the job this time? Via
>>>> Zeppelin?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Apr 19, 2016 at 12:51 PM, Andrea Sella <
>>>> andrea.sella@radicalbit.io> wrote:
>>>>
>>>>> Hi Till,
>>>>>
>>>>> I've used your branch fixScalaShell to test the scala-shell with our
>>>>> HA cluster, it doesn't work. Same error as before
>>>>>
>>>>> 2016-04-19 06:40:35,030 WARN  org.apache.flink.yarn.YarnJobManager
>>>>>                      - Discard message
>>>>> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
>>>>> aa5b034e10a850d863642a24aab75d9c),EXECUTION_RESULT_AND_STATE_CHANGES))
>>>>> because the expected leader session ID
>>>>> Some(bc706707-2bab-4b82-b7a7-1426dce696a7) did not equal the received
>>>>> leader session ID None.
>>>>>
>>>>> If I submit a simple job, it works. I think it is not a problem of our
>>>>> environment.
>>>>>
>>>>> Cheers,
>>>>> Andrea
>>>>>
>>>>> 2016-04-18 18:41 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
>>>>>
>>>>>> Cool, that helps a lot :-)
>>>>>>
>>>>>> On Mon, Apr 18, 2016 at 6:06 PM, Andrea Sella <
>>>>>> andrea.sella@radicalbit.io> wrote:
>>>>>>
>>>>>>> Hi Till,
>>>>>>>
>>>>>>> Don't worry, I am going to test the PR in our HA environment.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Andrea
>>>>>>>
>>>>>>>
>>>>>>> 2016-04-18 17:46 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
>>>>>>>
>>>>>>>> Hi Andrea,
>>>>>>>>
>>>>>>>> sorry I've seen your mail too late. I already fixed the problem
and
>>>>>>>> opened a PR [1] for it. I hope you haven't invested too much
time for it,
>>>>>>>> yet.
>>>>>>>>
>>>>>>>> [1] https://github.com/apache/flink/pull/1904
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Mon, Apr 18, 2016 at 11:19 AM, Andrea Sella <
>>>>>>>> andrea.sella@radicalbit.io> wrote:
>>>>>>>>
>>>>>>>>> Hi Till,
>>>>>>>>> Thanks for the support, I will take the issue and starting
to work
>>>>>>>>> on it asap.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Andrea
>>>>>>>>>
>>>>>>>>> 2016-04-18 10:32 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
>>>>>>>>>
>>>>>>>>>> Hi Andrea,
>>>>>>>>>>
>>>>>>>>>> I think the problem is simply that it has not been
correctly
>>>>>>>>>> implemented. I just checked and I think the user
configuration is not given
>>>>>>>>>> to the PlanExecutor which is internally created.
I’ve opened an
>>>>>>>>>> issue for that [1].
>>>>>>>>>>
>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-3774
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>> ​
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 15, 2016 at 4:58 PM, Andrea Sella <
>>>>>>>>>> andrea.sella@radicalbit.io> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Till,
>>>>>>>>>>>
>>>>>>>>>>> I've tried the Scala-Shell with our HA cluster,
no luck again.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Andrea
>>>>>>>>>>>
>>>>>>>>>>> 2016-04-15 14:43 GMT+02:00 Andrea Sella <
>>>>>>>>>>> andrea.sella@radicalbit.io>:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>
>>>>>>>>>>>> I am using a branched version of 1.0.1 where
I cherry-picked
>>>>>>>>>>>> FLINK-2935
>>>>>>>>>>>> <https://github.com/radicalbit/flink/commit/dfbbb9e48c98b486baf279c396d1bf7de31c1f8c>
to
>>>>>>>>>>>> use FlinkILoop with Configuration. My Flink
interpreter is here
>>>>>>>>>>>> <https://github.com/radicalbit/incubator-zeppelin/blob/flink-yarn-interpreter/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java>,
>>>>>>>>>>>> I've started tweaking just two days ago and
as I can see there is a
>>>>>>>>>>>> Zeppelin issue
>>>>>>>>>>>> <https://issues.apache.org/jira/browse/ZEPPELIN-664>
to
>>>>>>>>>>>> provide FlinkInterpeter working with Yarn
and I need it too.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Andrea
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2016-04-15 14:20 GMT+02:00 Till Rohrmann
<trohrmann@apache.org>
>>>>>>>>>>>> :
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Andrea,
>>>>>>>>>>>>>
>>>>>>>>>>>>> which version of Flink are you using
with Zeppelin? How do you
>>>>>>>>>>>>> pass the Flink configuration to the FlinkILoop?
Could you maybe show me
>>>>>>>>>>>>> your version of Zeppelin (code).
>>>>>>>>>>>>>
>>>>>>>>>>>>> According to the log, the ScalaShellRemoteEnvironment
didn't
>>>>>>>>>>>>> get the Flink configuration with the
HA settings. Therefore, it still tries
>>>>>>>>>>>>> to connect to the jobmanager specified
by the host and port values. The
>>>>>>>>>>>>> functionality to pass in a Flink configuration
object to FlinkILoop has
>>>>>>>>>>>>> only been merged recently. You might
have to switch to the 1.1-SNAPSHOT
>>>>>>>>>>>>> version for that. This means that you
would have to update the Flink
>>>>>>>>>>>>> version in your Zeppelin branch to 1.1-SNAPSHOT
to make it work.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Till
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Apr 15, 2016 at 1:03 PM, Andrea
Sella <
>>>>>>>>>>>>> andrea.sella@radicalbit.io> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks to follow me with this issue
:)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here the logs
>>>>>>>>>>>>>> <https://gist.github.com/alkagin/663fae1fc2993f0acd3ba66697f14093>,
>>>>>>>>>>>>>> are there enough?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As I wrote in the previous mail,
in the logs you can see also
>>>>>>>>>>>>>> the Configuration.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Andrea
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2016-04-15 10:07 GMT+02:00 Till Rohrmann
<
>>>>>>>>>>>>>> trohrmann@apache.org>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In HA mode, the host and port
information you provide to the
>>>>>>>>>>>>>>> Shell should
>>>>>>>>>>>>>>> be simply ignored. So you don't
have to retrieve them from
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> .yarn-properties file.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Could you maybe run the FlinkInterpreter
with debug log
>>>>>>>>>>>>>>> level and share the
>>>>>>>>>>>>>>> logs with me? You can also do
that privately, if you don't
>>>>>>>>>>>>>>> want to share
>>>>>>>>>>>>>>> them on the mailing list.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I haven't tried it myself, but
I thought that the Shell also
>>>>>>>>>>>>>>> works with an
>>>>>>>>>>>>>>> HA cluster, because it uses the
same mechanism as the CLI,
>>>>>>>>>>>>>>> for example.
>>>>>>>>>>>>>>> I'll try it out later this day.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Apr 15, 2016 at 12:22
AM, Andrea Sella <
>>>>>>>>>>>>>>> andrea.sella@radicalbit.io>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > Hi Till,
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > The cluster has started
in HA.
>>>>>>>>>>>>>>> > I already patched Flink
interpreter to allow passing the
>>>>>>>>>>>>>>> Configuration to
>>>>>>>>>>>>>>> > FlinkILoop. Neverthless
I have to pass host and port to
>>>>>>>>>>>>>>> FlinkILoop, there
>>>>>>>>>>>>>>> > are required from FlinkILoop
constructor and I retrieve
>>>>>>>>>>>>>>> them from
>>>>>>>>>>>>>>> > .yarn-properties file.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > I logged Flink Configuration:
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > INFO [2016-04-14 17:52:58,141]
({pool-2-thread-2}
>>>>>>>>>>>>>>> > FlinkInterpreter.java[open]:96)
- Flink Configuration: {
>>>>>>>>>>>>>>> > recovery.mode=zookeeper,
host=yarn,
>>>>>>>>>>>>>>> > yarn-properties=/tmp/.yarn-properties-flink,
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> recovery.zookeeper.quorum=slave01:2181,slave02:2181,master:2181,
>>>>>>>>>>>>>>> > recovery.zookeeper.path.root=/flink/recovery}
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > and I attach some logs:
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Error displayed in paragraph
of Zeppelin
>>>>>>>>>>>>>>> > <
>>>>>>>>>>>>>>> https://gist.github.com/alkagin/612d736da8af9ee111e766b230559bb9
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > JobManager log
>>>>>>>>>>>>>>> > <
>>>>>>>>>>>>>>> https://gist.github.com/alkagin/0a0b2670ce77f7d9c0807b1e4ef7239a
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Interpreter/FlinkILoop log
>>>>>>>>>>>>>>> > <
>>>>>>>>>>>>>>> https://gist.github.com/alkagin/23e4cec15904448dd2b400a6a37f7fa7
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > I was looking Flink shell
and it works similar to the
>>>>>>>>>>>>>>> interpreter, do it
>>>>>>>>>>>>>>> > works with HA cluster?
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Thank you,
>>>>>>>>>>>>>>> > Andrea
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > 2016-04-14 16:09 GMT+02:00
Till Rohrmann <
>>>>>>>>>>>>>>> trohrmann@apache.org>:
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > > Hi Andrea,
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > have you started the
Flink Yarn cluster in HA mode? Then
>>>>>>>>>>>>>>> the job manager
>>>>>>>>>>>>>>> > > address is stored in
ZooKeeper and you have to tell your
>>>>>>>>>>>>>>> FlinkILoop that
>>>>>>>>>>>>>>> > it
>>>>>>>>>>>>>>> > > should retrieve the
JobManager address from there. In
>>>>>>>>>>>>>>> order to do that
>>>>>>>>>>>>>>> > you
>>>>>>>>>>>>>>> > > have to set conf.setString(ConfigConstants.RECOVERY_MODE,
>>>>>>>>>>>>>>> > > "zookeeper"),
>>>>>>>>>>>>>>> conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
>>>>>>>>>>>>>>> > > "address of your zookeeper
cluster") and
>>>>>>>>>>>>>>> > > conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY,
>>>>>>>>>>>>>>> > > "flink dir you've set")
where conf is the flink
>>>>>>>>>>>>>>> configuration object. The
>>>>>>>>>>>>>>> > > values for the different
configuration values must match
>>>>>>>>>>>>>>> the values
>>>>>>>>>>>>>>> > > specified in the flink-conf.yaml
file. You then give the
>>>>>>>>>>>>>>> FlinkILoop the
>>>>>>>>>>>>>>> > > conf object.
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > I’m not sure whether
you can specify a custom flink
>>>>>>>>>>>>>>> configuration in
>>>>>>>>>>>>>>> > > Zeppelin. I think you
can only specify a host and port.
>>>>>>>>>>>>>>> So either you
>>>>>>>>>>>>>>> > start
>>>>>>>>>>>>>>> > > you Flink cluster in
non-HA mode or you have to patch
>>>>>>>>>>>>>>> Zeppelin.
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > Cheers,
>>>>>>>>>>>>>>> > > Till
>>>>>>>>>>>>>>> > > ​
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > On Tue, Apr 12, 2016
at 5:12 PM, Andrea Sella <
>>>>>>>>>>>>>>> > andrea.sella@radicalbit.io>
>>>>>>>>>>>>>>> > > wrote:
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > > Hi,
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> > > > I am working to
allow Zeppelin's flink interpreter to
>>>>>>>>>>>>>>> connect an
>>>>>>>>>>>>>>> > existing
>>>>>>>>>>>>>>> > > > yarn cluster.
Yarn cluster has started via
>>>>>>>>>>>>>>> yarn-session and flink's
>>>>>>>>>>>>>>> > > version
>>>>>>>>>>>>>>> > > > is 1.0.0.
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> > > > My approach is
to read host and port from
>>>>>>>>>>>>>>> .yarn-properties and pass
>>>>>>>>>>>>>>> > them
>>>>>>>>>>>>>>> > > to
>>>>>>>>>>>>>>> > > > IFlinkLoop.
>>>>>>>>>>>>>>> > > > Now I am facing
an issue with Session ID when I submit
>>>>>>>>>>>>>>> a paragraph to
>>>>>>>>>>>>>>> > > yarn
>>>>>>>>>>>>>>> > > > cluster.
>>>>>>>>>>>>>>> > > > The yarn cluster
throws a warning similar to:
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> > > > 2016-04-12 10:14:32,666
WARN
>>>>>>>>>>>>>>> org.apache.flink.yarn.YarnJobManager
>>>>>>>>>>>>>>> > > >              
   - Discard message
>>>>>>>>>>>>>>> > > > LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES))
>>>>>>>>>>>>>>> > > > because the expected
leader session ID
>>>>>>>>>>>>>>> > > > Some(afc85978-f765-488b-acbb-79c2d7cb89e0)
did not
>>>>>>>>>>>>>>> equal the received
>>>>>>>>>>>>>>> > > > leader session
ID None.
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> > > > My Zeppelin's
paragraph throws a
>>>>>>>>>>>>>>> > > JobClientActorSubmissionTimeoutException,
>>>>>>>>>>>>>>> > > > maybe is it due
to the missing sessionId? Do I need to
>>>>>>>>>>>>>>> pass extra
>>>>>>>>>>>>>>> > params
>>>>>>>>>>>>>>> > > to
>>>>>>>>>>>>>>> > > > connect correctly
to the yarn cluster or host and port
>>>>>>>>>>>>>>> are enough?
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> > > > Thanks in advance,
>>>>>>>>>>>>>>> > > > Andrea
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message