flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: [BUG?] Cannot Load User Class on Local Environment
Date Tue, 30 May 2017 06:13:54 GMT
Sounds great Matt :-)

On Tue, May 30, 2017 at 12:17 AM, Matt <dromitlabs@gmail.com> wrote:

> Thanks for looking into it Till!
>
> I'll try changing that line locally and then send a JIRA issue. When it
> gets officially fixed I'll probably create an Ignite-Flink connector to
> replace the older and less efficient one [1]. Users will be able to create
> Flink jobs on Ignite nodes, right where the data is stored.
>
> [1] https://apacheignite-mix.readme.io/docs/flink-streamer
>
> Best,
> Matt
>
>
>
> Matt
>
> On Mon, May 29, 2017 at 9:37 AM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> Hi Matt,
>>
>> I looked into it and it seems that the Task does not respect the context
>> class loader. The problem is that the local mode was not developed with the
>> intention to be executed within something like Ignite or an application
>> server. It rather assumes that you have a user code jar which is sent to
>> the TaskManager. This jar is then added to an URLClassLoader which is used
>> for user code class loading. In the case of the local execution mode, Flink
>> assumes that all user code jars are in the system class loader (which
>> usually holds true when running examples from the IDE). That is the reason
>> why we don’t check the TCCL. In order to fix your problem you can replace
>> BlobLibraryCacheManager.java:298 with this.classLoader = new
>> FlinkUserCodeClassLoader(libraryURLs, Thread.currentThread().getCont
>> extClassLoader());. Alternatively, you can build your job, copy the user
>> code jar to IGNITE_HOME/libs and then restart ignite.
>>
>> If you want to get the TCCL problem properly fixed, I suggest to open a
>> JIRA issue here [1].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK/?selectedTab=
>> com.atlassian.jira.jira-projects-plugin:summary-panel
>>
>> Cheers,
>> Till
>> ​
>>
>> On Mon, May 29, 2017 at 12:02 PM, Matt <dromitlabs@gmail.com> wrote:
>>
>>> Hi Till,
>>>
>>> Have you found anything or are you still busy with the release? I have
>>> no idea what may be wrong, but let me know if I can help you in any way to
>>> find what may be going on.
>>>
>>> Best,
>>> Matt
>>>
>>> On Wed, May 24, 2017 at 5:37 AM, Till Rohrmann <trohrmann@apache.org>
>>> wrote:
>>>
>>>> Hi Matt,
>>>>
>>>> sorry for not coming back to you sooner. We're currently in the release
>>>> phase and this consumes a lot of capacities.
>>>>
>>>> I tried to go to the linked repo, but Github tells me that it does not
>>>> exist. Have you removed it?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, May 17, 2017 at 10:56 PM, Matt <dromitlabs@gmail.com> wrote:
>>>>
>>>>> Check the repo at [1].
>>>>>
>>>>> The important step which I think is what you missed is running an
>>>>> Ignite node on your computer so the Java code, which launches an Ignite
>>>>> client on the JVM, connects to it and executes Flink on that node on
a
>>>>> local environment.
>>>>>
>>>>> Be aware "peerClassLoadingEnabled" should be enabled (as in
>>>>> ignite.xml), because it must match the config on the client node.
>>>>>
>>>>> If you follow the Readme file it's everything there, if you have any
>>>>> problem let me know!
>>>>>
>>>>> Cheers,
>>>>> Matt
>>>>>
>>>>> [1] https://github.com/Dromit/FlinkTest
>>>>>
>>>>> On Wed, May 17, 2017 at 3:49 PM, Matt <dromitlabs@gmail.com> wrote:
>>>>>
>>>>>> Thanks for your help Till.
>>>>>>
>>>>>> I will create a self contained test case in a moment and send you
the
>>>>>> link, wait for it.
>>>>>>
>>>>>> Cheers,
>>>>>> Matt
>>>>>>
>>>>>> On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <trohrmann@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Matt,
>>>>>>>
>>>>>>> alright, then we have to look into it again. I tried to run your
>>>>>>> example, however, it does not seem to be self-contained. Using
Ignite 2.0.0
>>>>>>> with -DIGNITE_QUIET=false -Xms512m the Ignite object seems to
be stuck in
>>>>>>> Ignite#start. In the logs I see the following warning:
>>>>>>>
>>>>>>> May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger
warning
>>>>>>> WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured
addresses (it is recommended in production to specify at least one address in TcpDiscoveryMulticastIpFinder.getAddresses()
configuration property)
>>>>>>> May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger
warning
>>>>>>> WARNING: IP finder returned empty addresses list. Please check
IP finder configuration and make sure multicast works on your network. Will retry every 2
secs.
>>>>>>>
>>>>>>> However, I assume that this is not critical.
>>>>>>>
>>>>>>> Maybe you can tell me how I can run your example in order to
debug
>>>>>>> it.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>> ​
>>>>>>>
>>>>>>> On Mon, May 15, 2017 at 10:05 PM, Matt <dromitlabs@gmail.com>
wrote:
>>>>>>>
>>>>>>>> Hi Till,
>>>>>>>>
>>>>>>>> I just tried with Flink 1.4 by compiling the current master
branch
>>>>>>>> on GitHub (as of this morning) and I still find the same
problem as before.
>>>>>>>> If I'm not wrong your PR was merged already, so your fixes
should be part
>>>>>>>> of the binary.
>>>>>>>>
>>>>>>>> I hope you have time to have a look at the test case in [1].
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Matt
>>>>>>>>
>>>>>>>> [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d
>>>>>>>>
>>>>>>>> On Thu, Apr 27, 2017 at 10:09 AM, Matt <dromitlabs@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Till,
>>>>>>>>>
>>>>>>>>> Great! Do you know if it's planned to be included in
v1.2.x or
>>>>>>>>> should we wait for v1.3? I'll give it a try as soon as
it's merged.
>>>>>>>>>
>>>>>>>>> You're right about this approach launching a mini cluster
on each
>>>>>>>>> Ignite node. That is intentional, as described in my
previous message on
>>>>>>>>> the list [1].
>>>>>>>>>
>>>>>>>>> The idea is to collocate Flink jobs on Ignite nodes,
so each
>>>>>>>>> dataflow only processes the elements stored on the local
in-memory
>>>>>>>>> database. I get the impression this should be much faster
than randomly
>>>>>>>>> picking a Flink node and sending all the data over the
network.
>>>>>>>>>
>>>>>>>>> Any insight on this?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Matt
>>>>>>>>>
>>>>>>>>> [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>>>>> ble.com/Flink-on-Ignite-Collocation-td12780.html
>>>>>>>>>
>>>>>>>>> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <
>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> I just copied my response because my other email
address is not
>>>>>>>>>> accepted on the user mailing list.
>>>>>>>>>>
>>>>>>>>>> Hi Matt,
>>>>>>>>>>
>>>>>>>>>> I think Stefan's analysis is correct. I have a PR
open [1], where
>>>>>>>>>> I fix the issue with the class loader.
>>>>>>>>>>
>>>>>>>>>> As a side note, by doing what you're doing, you will
spawn on
>>>>>>>>>> each Ignite node a new Flink mini cluster. These
mini cluster won't
>>>>>>>>>> communicate with each other and run independently.
Is this what you intend
>>>>>>>>>> to do?
>>>>>>>>>>
>>>>>>>>>> [1] https://github.com/apache/flink/pull/3781
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitlabs@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Let's wait for Till then, I hope he can figure
this out.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter
<
>>>>>>>>>>> s.richter@data-artisans.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ok, now the question is also about what classloaders
Ignite is
>>>>>>>>>>>> creating and how they are used, but the relevant
code line in Flink is
>>>>>>>>>>>> probably in FlinkMiniCluster.scala, line
538 (current master):
>>>>>>>>>>>>
>>>>>>>>>>>>  try {
>>>>>>>>>>>>  JobClient.submitJobAndWait(
>>>>>>>>>>>>    clientActorSystem,
>>>>>>>>>>>>    configuration,
>>>>>>>>>>>>    leaderRetrievalService,
>>>>>>>>>>>>    jobGraph,
>>>>>>>>>>>>    timeout,
>>>>>>>>>>>>    printUpdates,
>>>>>>>>>>>>    this.getClass.getClassLoader())
>>>>>>>>>>>> } finally {
>>>>>>>>>>>>    if(!useSingleActorSystem) {
>>>>>>>>>>>>      // we have to shutdown the just created
actor system
>>>>>>>>>>>>      shutdownJobClientActorSystem(clientActorSystem)
>>>>>>>>>>>>    }
>>>>>>>>>>>>  }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This is what is executed as part of executing
a job through
>>>>>>>>>>>> LocalEnvironment. As we can see, the classloader
is set to the classloader
>>>>>>>>>>>> of FlinkMiniCluster. Depending on the classloader
structure inside Ignite,
>>>>>>>>>>>> this classloader might not know your user
code. What you could do is
>>>>>>>>>>>> changing this line in a custom Flink build,
changing line 538 for example
>>>>>>>>>>>> to Thread.currentThread().getContextClassloader()
and ensuring
>>>>>>>>>>>> that the context classloader ins the runnable
is a classloader that a)
>>>>>>>>>>>> knows the user code and b) is a child of
the classloader that knows the
>>>>>>>>>>>> Ignite and Flink classes. Notice that this
is not a general solution and
>>>>>>>>>>>> should not become a general fix.
>>>>>>>>>>>>
>>>>>>>>>>>> I have heard that Till is about to change
some things about
>>>>>>>>>>>> local execution, so I included him in CC.
Maybe he can provide additional
>>>>>>>>>>>> hints how your use case might be better supported
in the upcoming Flink 1.3.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Stefan
>>>>>>>>>>>>
>>>>>>>>>>>> Am 25.04.2017 um 22:50 schrieb Matt <dromitlabs@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>> I updated the code a little bit for clarity,
now the line #56
>>>>>>>>>>>> mentioned in my previous message is line
#25.
>>>>>>>>>>>>
>>>>>>>>>>>> In summary the error I'm getting is this:
>>>>>>>>>>>>
>>>>>>>>>>>> ---
>>>>>>>>>>>> Caused by: org.apache.flink.streaming.run
>>>>>>>>>>>> time.tasks.StreamTaskException: Cannot load
user class:
>>>>>>>>>>>> com.test.Test
>>>>>>>>>>>> ClassLoader info: URL ClassLoader:
>>>>>>>>>>>> Class not resolvable through given classloader.
>>>>>>>>>>>> ---
>>>>>>>>>>>>
>>>>>>>>>>>> But if I'm not wrong, after trying to load
the class through
>>>>>>>>>>>> URLClassLoader, Flink should try loading
it with its parent ClassLoader,
>>>>>>>>>>>> which should be the same ClassLoader that
executed the environment, and it
>>>>>>>>>>>> does have access to the class.
>>>>>>>>>>>>
>>>>>>>>>>>> Not sure what is wrong.
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitlabs@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Stefan,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Check the code here: https://gist.github.com/
>>>>>>>>>>>>> 17d82ee7dd921a0d649574a361cc017d , the
output is at the
>>>>>>>>>>>>> bottom of the page.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Here are the results of the additional
tests you mentioned:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. I was able to instantiate an inner
class (Test$Foo) inside
>>>>>>>>>>>>> the Ignite closure, no problem with that
>>>>>>>>>>>>> 2. I tried implementing SourceFunction
and SinkFunction in
>>>>>>>>>>>>> Test itself, I was able to instantiate
the class inside the Ignite closure
>>>>>>>>>>>>> 3. I'm not sure what you meant in this
point, is it something
>>>>>>>>>>>>> like what I tried in line #56?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Additionally, I tried implementing the
SourceFunction and
>>>>>>>>>>>>> SinkFunction in Test$Foo with the same
result: it says "Cannot load user
>>>>>>>>>>>>> class: com.test.Test$Foo"
>>>>>>>>>>>>>
>>>>>>>>>>>>> Looks like Flink is not using the correct
ClassLoader. Any
>>>>>>>>>>>>> idea?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Matt
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan
Richter <
>>>>>>>>>>>>> s.richter@data-artisans.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I would expect that the local environment
picks up the class
>>>>>>>>>>>>>> path from the code that launched
it. So I think the question is what
>>>>>>>>>>>>>> happens behind the scenes when you
call ignite.compute().
>>>>>>>>>>>>>> broadcast(runnable); . Which classes
are shipped and how is
>>>>>>>>>>>>>> the classpath build in the environment
that runs the code. Your example is
>>>>>>>>>>>>>> also not fully conclusive, because
com.myproj.Test (which you can
>>>>>>>>>>>>>> successfully instantiate) and com.myproj.Test$1$2
(which fails) are
>>>>>>>>>>>>>> different classes, so maybe only
the outer class is shipped with the
>>>>>>>>>>>>>> broadcast call. My theory is that
not all classes are shipped (e.g. inner
>>>>>>>>>>>>>> classes), but only Test . You could
try three things to analyze to problem
>>>>>>>>>>>>>> a little more:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1) Create another inner class inside
Test and try if you are
>>>>>>>>>>>>>> still able to instantiate also this
class via reflection.
>>>>>>>>>>>>>> 2) Let Test class itself implement
the map function (avoiding
>>>>>>>>>>>>>> the usage of other/inner classes)
and see if this works.
>>>>>>>>>>>>>> 3) Check and set the thread’s context
classloader inside the
>>>>>>>>>>>>>> runnable to something that contains
all required classes and see if this
>>>>>>>>>>>>>> gets picked up by Flink.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Stefan
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Am 25.04.2017 um 07:27 schrieb Matt
<dromitlabs@gmail.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm trying to run Flink using a local
environment, but on an
>>>>>>>>>>>>>> Ignite node to achieve collocation
(as mentioned in my previous message on
>>>>>>>>>>>>>> this list).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Have a look at the code in [1]. It's
pretty simple, but I'm
>>>>>>>>>>>>>> getting a "cannot load user class"
error as shown in [2].
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If you check line #29 on the code,
I'm able to create an
>>>>>>>>>>>>>> instance of class Test, and it's
the same context from which I'm creating
>>>>>>>>>>>>>> the Flink job. Shouldn't it work
provided I'm using a local environment?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It would be really nice to be able
to inject a ClassLoader
>>>>>>>>>>>>>> into the chunk of code that creates
the job. Is this currently possible?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any fix or workaround is appreciated!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Matt
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215
>>>>>>>>>>>>>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message