flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt <dromitl...@gmail.com>
Subject Re: [BUG?] Cannot Load User Class on Local Environment
Date Mon, 29 May 2017 22:17:48 GMT
Thanks for looking into it Till!

I'll try changing that line locally and then send a JIRA issue. When it
gets officially fixed I'll probably create an Ignite-Flink connector to
replace the older and less efficient one [1]. Users will be able to create
Flink jobs on Ignite nodes, right where the data is stored.

[1] https://apacheignite-mix.readme.io/docs/flink-streamer

Best,
Matt



Matt

On Mon, May 29, 2017 at 9:37 AM, Till Rohrmann <trohrmann@apache.org> wrote:

> Hi Matt,
>
> I looked into it and it seems that the Task does not respect the context
> class loader. The problem is that the local mode was not developed with the
> intention to be executed within something like Ignite or an application
> server. It rather assumes that you have a user code jar which is sent to
> the TaskManager. This jar is then added to an URLClassLoader which is used
> for user code class loading. In the case of the local execution mode, Flink
> assumes that all user code jars are in the system class loader (which
> usually holds true when running examples from the IDE). That is the reason
> why we don’t check the TCCL. In order to fix your problem you can replace
> BlobLibraryCacheManager.java:298 with this.classLoader = new
> FlinkUserCodeClassLoader(libraryURLs, Thread.currentThread().
> getContextClassLoader());. Alternatively, you can build your job, copy
> the user code jar to IGNITE_HOME/libs and then restart ignite.
>
> If you want to get the TCCL problem properly fixed, I suggest to open a
> JIRA issue here [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK/?
> selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel
>
> Cheers,
> Till
> ​
>
> On Mon, May 29, 2017 at 12:02 PM, Matt <dromitlabs@gmail.com> wrote:
>
>> Hi Till,
>>
>> Have you found anything or are you still busy with the release? I have no
>> idea what may be wrong, but let me know if I can help you in any way to
>> find what may be going on.
>>
>> Best,
>> Matt
>>
>> On Wed, May 24, 2017 at 5:37 AM, Till Rohrmann <trohrmann@apache.org>
>> wrote:
>>
>>> Hi Matt,
>>>
>>> sorry for not coming back to you sooner. We're currently in the release
>>> phase and this consumes a lot of capacities.
>>>
>>> I tried to go to the linked repo, but Github tells me that it does not
>>> exist. Have you removed it?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, May 17, 2017 at 10:56 PM, Matt <dromitlabs@gmail.com> wrote:
>>>
>>>> Check the repo at [1].
>>>>
>>>> The important step which I think is what you missed is running an
>>>> Ignite node on your computer so the Java code, which launches an Ignite
>>>> client on the JVM, connects to it and executes Flink on that node on a
>>>> local environment.
>>>>
>>>> Be aware "peerClassLoadingEnabled" should be enabled (as in
>>>> ignite.xml), because it must match the config on the client node.
>>>>
>>>> If you follow the Readme file it's everything there, if you have any
>>>> problem let me know!
>>>>
>>>> Cheers,
>>>> Matt
>>>>
>>>> [1] https://github.com/Dromit/FlinkTest
>>>>
>>>> On Wed, May 17, 2017 at 3:49 PM, Matt <dromitlabs@gmail.com> wrote:
>>>>
>>>>> Thanks for your help Till.
>>>>>
>>>>> I will create a self contained test case in a moment and send you the
>>>>> link, wait for it.
>>>>>
>>>>> Cheers,
>>>>> Matt
>>>>>
>>>>> On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <trohrmann@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Matt,
>>>>>>
>>>>>> alright, then we have to look into it again. I tried to run your
>>>>>> example, however, it does not seem to be self-contained. Using Ignite
2.0.0
>>>>>> with -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be
stuck in
>>>>>> Ignite#start. In the logs I see the following warning:
>>>>>>
>>>>>> May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger
warning
>>>>>> WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses
(it is recommended in production to specify at least one address in TcpDiscoveryMulticastIpFinder.getAddresses()
configuration property)
>>>>>> May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger
warning
>>>>>> WARNING: IP finder returned empty addresses list. Please check IP
finder configuration and make sure multicast works on your network. Will retry every 2 secs.
>>>>>>
>>>>>> However, I assume that this is not critical.
>>>>>>
>>>>>> Maybe you can tell me how I can run your example in order to debug
it.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>> ​
>>>>>>
>>>>>> On Mon, May 15, 2017 at 10:05 PM, Matt <dromitlabs@gmail.com>
wrote:
>>>>>>
>>>>>>> Hi Till,
>>>>>>>
>>>>>>> I just tried with Flink 1.4 by compiling the current master branch
>>>>>>> on GitHub (as of this morning) and I still find the same problem
as before.
>>>>>>> If I'm not wrong your PR was merged already, so your fixes should
be part
>>>>>>> of the binary.
>>>>>>>
>>>>>>> I hope you have time to have a look at the test case in [1].
>>>>>>>
>>>>>>> Best,
>>>>>>> Matt
>>>>>>>
>>>>>>> [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d
>>>>>>>
>>>>>>> On Thu, Apr 27, 2017 at 10:09 AM, Matt <dromitlabs@gmail.com>
wrote:
>>>>>>>
>>>>>>>> Hi Till,
>>>>>>>>
>>>>>>>> Great! Do you know if it's planned to be included in v1.2.x
or
>>>>>>>> should we wait for v1.3? I'll give it a try as soon as it's
merged.
>>>>>>>>
>>>>>>>> You're right about this approach launching a mini cluster
on each
>>>>>>>> Ignite node. That is intentional, as described in my previous
message on
>>>>>>>> the list [1].
>>>>>>>>
>>>>>>>> The idea is to collocate Flink jobs on Ignite nodes, so each
>>>>>>>> dataflow only processes the elements stored on the local
in-memory
>>>>>>>> database. I get the impression this should be much faster
than randomly
>>>>>>>> picking a Flink node and sending all the data over the network.
>>>>>>>>
>>>>>>>> Any insight on this?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Matt
>>>>>>>>
>>>>>>>> [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>>>> ble.com/Flink-on-Ignite-Collocation-td12780.html
>>>>>>>>
>>>>>>>> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <
>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>
>>>>>>>>> I just copied my response because my other email address
is not
>>>>>>>>> accepted on the user mailing list.
>>>>>>>>>
>>>>>>>>> Hi Matt,
>>>>>>>>>
>>>>>>>>> I think Stefan's analysis is correct. I have a PR open
[1], where
>>>>>>>>> I fix the issue with the class loader.
>>>>>>>>>
>>>>>>>>> As a side note, by doing what you're doing, you will
spawn on each
>>>>>>>>> Ignite node a new Flink mini cluster. These mini cluster
won't communicate
>>>>>>>>> with each other and run independently. Is this what you
intend to do?
>>>>>>>>>
>>>>>>>>> [1] https://github.com/apache/flink/pull/3781
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitlabs@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Let's wait for Till then, I hope he can figure this
out.
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter
<
>>>>>>>>>> s.richter@data-artisans.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ok, now the question is also about what classloaders
Ignite is
>>>>>>>>>>> creating and how they are used, but the relevant
code line in Flink is
>>>>>>>>>>> probably in FlinkMiniCluster.scala, line 538
(current master):
>>>>>>>>>>>
>>>>>>>>>>>  try {
>>>>>>>>>>>  JobClient.submitJobAndWait(
>>>>>>>>>>>    clientActorSystem,
>>>>>>>>>>>    configuration,
>>>>>>>>>>>    leaderRetrievalService,
>>>>>>>>>>>    jobGraph,
>>>>>>>>>>>    timeout,
>>>>>>>>>>>    printUpdates,
>>>>>>>>>>>    this.getClass.getClassLoader())
>>>>>>>>>>> } finally {
>>>>>>>>>>>    if(!useSingleActorSystem) {
>>>>>>>>>>>      // we have to shutdown the just created
actor system
>>>>>>>>>>>      shutdownJobClientActorSystem(clientActorSystem)
>>>>>>>>>>>    }
>>>>>>>>>>>  }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This is what is executed as part of executing
a job through
>>>>>>>>>>> LocalEnvironment. As we can see, the classloader
is set to the classloader
>>>>>>>>>>> of FlinkMiniCluster. Depending on the classloader
structure inside Ignite,
>>>>>>>>>>> this classloader might not know your user code.
What you could do is
>>>>>>>>>>> changing this line in a custom Flink build, changing
line 538 for example
>>>>>>>>>>> to Thread.currentThread().getContextClassloader()
and ensuring
>>>>>>>>>>> that the context classloader ins the runnable
is a classloader that a)
>>>>>>>>>>> knows the user code and b) is a child of the
classloader that knows the
>>>>>>>>>>> Ignite and Flink classes. Notice that this is
not a general solution and
>>>>>>>>>>> should not become a general fix.
>>>>>>>>>>>
>>>>>>>>>>> I have heard that Till is about to change some
things about
>>>>>>>>>>> local execution, so I included him in CC. Maybe
he can provide additional
>>>>>>>>>>> hints how your use case might be better supported
in the upcoming Flink 1.3.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Stefan
>>>>>>>>>>>
>>>>>>>>>>> Am 25.04.2017 um 22:50 schrieb Matt <dromitlabs@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>> I updated the code a little bit for clarity,
now the line #56
>>>>>>>>>>> mentioned in my previous message is line #25.
>>>>>>>>>>>
>>>>>>>>>>> In summary the error I'm getting is this:
>>>>>>>>>>>
>>>>>>>>>>> ---
>>>>>>>>>>> Caused by: org.apache.flink.streaming.run
>>>>>>>>>>> time.tasks.StreamTaskException: Cannot load user
class:
>>>>>>>>>>> com.test.Test
>>>>>>>>>>> ClassLoader info: URL ClassLoader:
>>>>>>>>>>> Class not resolvable through given classloader.
>>>>>>>>>>> ---
>>>>>>>>>>>
>>>>>>>>>>> But if I'm not wrong, after trying to load the
class through
>>>>>>>>>>> URLClassLoader, Flink should try loading it with
its parent ClassLoader,
>>>>>>>>>>> which should be the same ClassLoader that executed
the environment, and it
>>>>>>>>>>> does have access to the class.
>>>>>>>>>>>
>>>>>>>>>>> Not sure what is wrong.
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitlabs@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Stefan,
>>>>>>>>>>>>
>>>>>>>>>>>> Check the code here: https://gist.github.com/
>>>>>>>>>>>> 17d82ee7dd921a0d649574a361cc017d , the output
is at the bottom
>>>>>>>>>>>> of the page.
>>>>>>>>>>>>
>>>>>>>>>>>> Here are the results of the additional tests
you mentioned:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. I was able to instantiate an inner class
(Test$Foo) inside
>>>>>>>>>>>> the Ignite closure, no problem with that
>>>>>>>>>>>> 2. I tried implementing SourceFunction and
SinkFunction in Test
>>>>>>>>>>>> itself, I was able to instantiate the class
inside the Ignite closure
>>>>>>>>>>>> 3. I'm not sure what you meant in this point,
is it something
>>>>>>>>>>>> like what I tried in line #56?
>>>>>>>>>>>>
>>>>>>>>>>>> Additionally, I tried implementing the SourceFunction
and
>>>>>>>>>>>> SinkFunction in Test$Foo with the same result:
it says "Cannot load user
>>>>>>>>>>>> class: com.test.Test$Foo"
>>>>>>>>>>>>
>>>>>>>>>>>> Looks like Flink is not using the correct
ClassLoader. Any idea?
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Matt
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter
<
>>>>>>>>>>>> s.richter@data-artisans.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I would expect that the local environment
picks up the class
>>>>>>>>>>>>> path from the code that launched it.
So I think the question is what
>>>>>>>>>>>>> happens behind the scenes when you call
ignite.compute().
>>>>>>>>>>>>> broadcast(runnable); . Which classes
are shipped and how is
>>>>>>>>>>>>> the classpath build in the environment
that runs the code. Your example is
>>>>>>>>>>>>> also not fully conclusive, because com.myproj.Test
(which you can
>>>>>>>>>>>>> successfully instantiate) and com.myproj.Test$1$2
(which fails) are
>>>>>>>>>>>>> different classes, so maybe only the
outer class is shipped with the
>>>>>>>>>>>>> broadcast call. My theory is that not
all classes are shipped (e.g. inner
>>>>>>>>>>>>> classes), but only Test . You could try
three things to analyze to problem
>>>>>>>>>>>>> a little more:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1) Create another inner class inside
Test and try if you are
>>>>>>>>>>>>> still able to instantiate also this class
via reflection.
>>>>>>>>>>>>> 2) Let Test class itself implement the
map function (avoiding
>>>>>>>>>>>>> the usage of other/inner classes) and
see if this works.
>>>>>>>>>>>>> 3) Check and set the thread’s context
classloader inside the
>>>>>>>>>>>>> runnable to something that contains all
required classes and see if this
>>>>>>>>>>>>> gets picked up by Flink.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Stefan
>>>>>>>>>>>>>
>>>>>>>>>>>>> Am 25.04.2017 um 07:27 schrieb Matt <dromitlabs@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm trying to run Flink using a local
environment, but on an
>>>>>>>>>>>>> Ignite node to achieve collocation (as
mentioned in my previous message on
>>>>>>>>>>>>> this list).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Have a look at the code in [1]. It's
pretty simple, but I'm
>>>>>>>>>>>>> getting a "cannot load user class" error
as shown in [2].
>>>>>>>>>>>>>
>>>>>>>>>>>>> If you check line #29 on the code, I'm
able to create an
>>>>>>>>>>>>> instance of class Test, and it's the
same context from which I'm creating
>>>>>>>>>>>>> the Flink job. Shouldn't it work provided
I'm using a local environment?
>>>>>>>>>>>>>
>>>>>>>>>>>>> It would be really nice to be able to
inject a ClassLoader
>>>>>>>>>>>>> into the chunk of code that creates the
job. Is this currently possible?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any fix or workaround is appreciated!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Matt
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215
>>>>>>>>>>>>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message