flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: neo4j - Flink connector
Date Fri, 13 Nov 2015 12:31:44 GMT
Have a look at my updated version of your code:
https://github.com/rmetzger/scratch/tree/dependency_problem
It now executes both tests, however, I was not able to get the second test
to pass. It seems that Neo4j's web server is returning a 500 status code
when open()'ing the connection.
I'm not sure how to debug this issue.

On Thu, Nov 12, 2015 at 2:19 PM, Martin Junghanns <m.junghanns@mailbox.org>
wrote:

> Hi Robert,
>
> Thank you for the reply! At the moment we just "play" with Neo4j and Flink
> but the InputFormat shall be available in Flink eventually.
>
> Concerning the license: I did not think of that, but yes, I can make it
> available in maven central. I just need to find out how to do this.
>
> I created a branch that includes the dependency problem [1]. There is a
> test case "neo4jOnly" [2] which does not use Flink and works fine in a
> project where only neo4j-harness is included. However, when I add
> flink-java and flink-gelly (excluding flink-clients because of jetty) to
> the project, the neo4jOnly test fails with:
>
> org.neo4j.server.ServerStartupException: Starting Neo4j failed:
> com.sun.jersey.core.reflection.ReflectionHelper.classForNamePA(Ljava/lang/String;)Ljava/security/PrivilegedAction;
>
> I compared the depedencies of the "clean" neo4j-harness project and made
> sure the dependencies and versions are the same. ReflectionHelper is part
> of jersey-core which is included.
>
> This is really weird, because - as I wrote before - the simple neo4jOnly
> test ran a few days ago. Were there any changes concerning dependencies in
> 0.10-SNAPSHOT?
> However, the next thing which would fail is caused by the scala-library
> version conflict.
>
> Again, thanks for your help.
>
> Best,
> Martin
>
> [1] https://github.com/s1ck/flink-neo4j/tree/dependency_problem
> [2]
> https://github.com/s1ck/flink-neo4j/blob/dependency_problem/src/test/java/org/apache/flink/api/java/io/neo4j/Neo4jInputTest.java#L32
>
>
> On 12.11.2015 12:51, Robert Metzger wrote:
>
>> Sorry for the delay.
>> So the plan of this work is to add a neo4j connector into Flink, right?
>>
>> While looking at the pom files of neo4j I found that its GPLv3 licensed,
>> and Apache projects can not depend/link with GPL code [1].
>> So I we can not make the module part of the Flink source.
>> However, its actually quite easy to publish code into Maven central, so
>> you
>> could release it on your own into Maven.
>> If that is too much work for you, I can also start a github project like
>> "flink-gpl" with access to maven central where we can release stuff like
>> this.
>>
>> Is this repository [2] your current work in progress on the dependency
>> issue?
>> Maybe the neo4j dependency expects scala 2.11 and there is no scala 2.10
>> build out. In this case, we could require Flink users to use the scala
>> 2.11
>> build of Flink when they want to use neo4j.
>> I think I can help you much better as soon as I have your current pom file
>> + code.
>>
>> [1] http://www.apache.org/legal/resolved.html#category-a
>> [2] https://github.com/s1ck/flink-neo4j
>>
>>
>> On Wed, Nov 11, 2015 at 7:38 PM, Martin Junghanns <
>> m.junghanns@mailbox.org>
>> wrote:
>>
>> Hi,
>>>
>>> I am a bit stuck with that dependency problem. Any help would be
>>> appreciated as I would like to continue working on the formats. Thanks!
>>>
>>> Best,
>>> Martin
>>>
>>>
>>> On 07.11.2015 17:28, Martin Junghanns wrote:
>>>
>>> Hi Robert,
>>>>
>>>> Thank you for the hints. I tried to narrow down the error:
>>>>
>>>> Flink version: 0.10-SNAPSHOT
>>>> Neo4j version: 2.3.0
>>>>
>>>> I start with two dependencies:
>>>> flink-java
>>>> flink-gelly
>>>>
>>>> (1) Add neo4j-harness and run basic example from Neo4j [1]
>>>> Leads to:
>>>>
>>>> java.lang.ClassNotFoundException:
>>>> org.eclipse.jetty.server.ConnectionFactory
>>>>
>>>> (2) I excluded jetty-server from flink-java and flink-gelly
>>>> It now uses jetty-server:9.2.4.v20141103 (was 8.0.0.M1)
>>>> Leads to:
>>>>
>>>> leads to: java.lang.NoSuchMethodError:
>>>> org.eclipse.jetty.servlet.ServletContextHandler.<init>
>>>>
>>>> (3) I excluded jetty-servlet from flink-java and flink-gelly
>>>> It now uses jetty-servlet:9.2.4.v20141103 (was 8.0.0.M1)
>>>> Leads to:
>>>>
>>>> java.lang.NoSuchMethodError: scala.Predef$.$conforms()
>>>>
>>>> (4) I excluded scala-library from flink-java and flink-gelly
>>>> It now uses scala-library:2.11.7 (was 2.10.4)
>>>>
>>>> Now, the basic Neo4j example (without Flink runs).
>>>>
>>>> Next, I added Flink to the mix and wrote a simple test using
>>>> neo4j-harness features, ExecutionEnvironment and my InputFormat.
>>>> Leads to:
>>>>
>>>> java.lang.NoSuchMethodError:
>>>>
>>>>
>>>> scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
>>>>
>>>>       at akka.actor.ActorCell$.<init>(ActorCell.scala:336)
>>>>       at akka.actor.ActorCell$.<clinit>(ActorCell.scala)
>>>>       at akka.actor.RootActorPath.$div(ActorPath.scala:159)
>>>>       at
>>>> akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:464)
>>>>       at
>>>> akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:452)
>>>>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>> Method)
>>>>       at
>>>>
>>>>
>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>>
>>>>       at
>>>>
>>>>
>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>
>>>>       at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>>>       at
>>>>
>>>>
>>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
>>>>
>>>>       at scala.util.Try$.apply(Try.scala:192)
>>>>       at
>>>>
>>>>
>>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
>>>>
>>>>       at
>>>>
>>>>
>>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>>>>
>>>>       at
>>>>
>>>>
>>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>>>>
>>>>       at scala.util.Success.flatMap(Try.scala:231)
>>>>       at
>>>>
>>>>
>>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
>>>>
>>>>       at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
>>>>       at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
>>>>       at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
>>>>       at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
>>>>       at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84)
>>>>
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:203)
>>>>
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:232)
>>>>
>>>>       at
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.org
>>>>
>>>> $apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:232)
>>>>
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:237)
>>>>
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:235)
>>>>
>>>>       at
>>>>
>>>>
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>>>>
>>>>       at
>>>>
>>>>
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>>>>
>>>>       at scala.collection.immutable.Range.foreach(Range.scala:166)
>>>>       at
>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>>>>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:235)
>>>>
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:226)
>>>>
>>>>       at
>>>> org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:115)
>>>>       at
>>>>
>>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:87)
>>>>
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821)
>>>>
>>>>       at
>>>> org.apache.flink.api.java.io
>>>> .neo4j.Neo4jInputTest.inputFormatTest(Neo4jInputTest.java:109)
>>>>
>>>>
>>>> This is where I don't know what to exclude next. Seems that some
>>>> components (akka?) need scala 2.10.4 and Neo4j (cypher) depends on scala
>>>> 2.11.7.
>>>>
>>>> How can I make use of the maven shade plugin in that case?
>>>>
>>>> Again, thank you!
>>>>
>>>> Cheers,
>>>> Martin
>>>>
>>>> [1]
>>>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>>>> (testMyExtensionWithFunctionFixture())
>>>>
>>>>
>>>> On 06.11.2015 16:17, Robert Metzger wrote:
>>>>
>>>> Hi Martin,
>>>>>
>>>>> what exactly were the issues you were facing with the dependency
>>>>> conflicts?
>>>>>
>>>>> There is a way around these issues, and its called shading:
>>>>> https://maven.apache.org/plugins/maven-shade-plugin/
>>>>> In Flink we have several shaded modules (curator, hadoop) .. we could
>>>>> add a
>>>>> neo4j-harness-shaded module which relocates conflicting dependencies
>>>>> into a
>>>>> different namespace. That way, you can execute different versions of
>>>>> the
>>>>> same library (jetty, scala) at the same time.
>>>>> Since the module contains module would only contain dependencies
>>>>> needed at
>>>>> test time, we could exclude it from releases.
>>>>>
>>>>> Regarding Scala, it would be fine to execute the neo4j tests only with
>>>>> scala 2.11 builds. Its not hard to control this using maven build
>>>>> profiles.
>>>>> Do you really need jetty? Is neo4j starting the web interface also for
>>>>> the
>>>>> tests?
>>>>>
>>>>> Regards,
>>>>> Robert
>>>>>
>>>>>
>>>>> On Fri, Nov 6, 2015 at 4:09 PM, Martin Junghanns
>>>>> <m.junghanns@mailbox.org>
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>>>
>>>>>> I could need your input on testing the input format with Flink.
>>>>>>
>>>>>> As I already mentioned, Neo4j offers a dedicated module
>>>>>> (neo4j-harness)
>>>>>> for unit testing server extensions / REST applications. The problem
>>>>>> here
>>>>>> is that the dependencies of Flink conflict with the dependencies
of
>>>>>> neo4j-harness (e.g. jetty, scala-library). I tried to figure out
what
>>>>>> combination could run using the maven exclude mechanism, but no
>>>>>> success.
>>>>>>
>>>>>> So I thought about workarounds:
>>>>>>
>>>>>> (1) Michael Hunger (neo4j) started a project and invited me to
>>>>>> contribute [1]. What it does during tests is:
>>>>>> - download a neo4j-<version>.tar.gz into a temp folder
>>>>>> - extract and start a neo4j instance
>>>>>> - run tests
>>>>>> - stop and discard neo4j
>>>>>>
>>>>>> I like the concept, but I guess the problem is that it runs outside
of
>>>>>> maven and I guess downloading from external resources (especially
in
>>>>>> travis-ci) could lead to problems.
>>>>>>
>>>>>> (2) I had a look into the other input formats. flink-hbase uses
>>>>>> examples
>>>>>> instead of unit tests. This could be an option as long as there is
no
>>>>>> clean solution for "real" unit testing.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Cheers, Martin
>>>>>>
>>>>>>
>>>>>> [1] https://github.com/jexp/neo4j-starter
>>>>>>
>>>>>>
>>>>>> On 03.11.2015 01:18, Stephan Ewen wrote:
>>>>>>
>>>>>> Wow, very nice results :-)
>>>>>>>
>>>>>>> This input format alone is probably a very useful contribution,
so I
>>>>>>>
>>>>>>> would
>>>>>>
>>>>>> open a contribution there once you manage to get a few tests running.
>>>>>>>
>>>>>>> I know little about neo4j, is there a way to read cypher query
>>>>>>> results in
>>>>>>> parallel? (most systems do not expose such an interface, but
maybe
>>>>>>> neo4j
>>>>>>>
>>>>>>> is
>>>>>>
>>>>>> special there).
>>>>>>>
>>>>>>> I recall at some point in time Martin Neumann asking about a
way to
>>>>>>>
>>>>>>> create
>>>>>>
>>>>>> dense contiguous unique IDs for creating graphs that can be
>>>>>>> bulk-imported
>>>>>>> into neo4j. There is code for that in the data set utils, this
may be
>>>>>>> valuable for an output format.
>>>>>>>
>>>>>>> On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns <
>>>>>>>
>>>>>>> m.junghanns@mailbox.org>
>>>>>>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>>
>>>>>>>> I wanted to give you a little update. I created a non-parallel
>>>>>>>> InputFormat which reads Cypher results from Neo4j into Tuples
[1].
>>>>>>>> It can be used like the JDBCInputFormat:
>>>>>>>>
>>>>>>>> String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1),
id(p2)";
>>>>>>>>
>>>>>>>> Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput
=
>>>>>>>> Neo4jInputFormat.buildNeo4jInputFormat()
>>>>>>>>         .setRestURI(restURI)
>>>>>>>>         .setCypherQuery(q)
>>>>>>>>         .setUsername("neo4j")
>>>>>>>>         .setPassword("test")
>>>>>>>>         .setConnectTimeout(1000)
>>>>>>>>         .setReadTimeout(1000)
>>>>>>>>         .finish();
>>>>>>>>
>>>>>>>> Atm, to run the tests, a Neo4j instance needs to be up and
running.
>>>>>>>> I tried to get neo4j-harness [2] into the project, but there
are
>>>>>>>> some
>>>>>>>> dependency conflicts which I need to figure out.
>>>>>>>>
>>>>>>>> I ran a first benchmark on my Laptop (4 cores, 12GB, SSD)
with Neo4j
>>>>>>>> running on the same machine. My dataset is the polish wiki
dump [2]
>>>>>>>> which consists of 430,602 pages and 2,727,302 links. The
protocol
>>>>>>>> is:
>>>>>>>>
>>>>>>>> 1) Read edge ids from cold Neo4j into Tuple2<Integer,
Integer>
>>>>>>>> 2) Convert Tuple2 into Tuple3<Integer, Integer, Double>
for edge
>>>>>>>> value
>>>>>>>> 3) Create Gelly graph from Tuple3, init vertex values to
1.0
>>>>>>>> 4) Run PageRank with beta=0.85 and 5 iterations
>>>>>>>>
>>>>>>>> This takes about 22 seconds on my machine which is very promising.
>>>>>>>>
>>>>>>>> Next steps are:
>>>>>>>> - OutputFormat
>>>>>>>> - Better Unit tests (integrate neo4j-harness)
>>>>>>>> - Bigger graphs :)
>>>>>>>>
>>>>>>>> Any ideas and suggestions are of course highly appreciated
:)
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Martin
>>>>>>>>
>>>>>>>>
>>>>>>>> [1] https://github.com/s1ck/flink-neo4j
>>>>>>>> [2]
>>>>>>>>
>>>>>>>>
>>>>>>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>>>>>>>
>>>>>>
>>>>>> [3] plwiktionary-20151002-pages-articles-multistream.xml.bz2
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 29.10.2015 14:51, Vasiliki Kalavri wrote:
>>>>>>>>
>>>>>>>> Hello everyone,
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Martin, Martin, Alex (cc'ed) and myself have started
discussing
>>>>>>>>> about
>>>>>>>>> implementing a neo4j-Flink connector. I've opened a corresponding
>>>>>>>>> JIRA
>>>>>>>>> (FLINK-2941) containing an initial document [1], but
we'd also
>>>>>>>>> like to
>>>>>>>>> share our ideas here to engage the community and get
your feedback.
>>>>>>>>>
>>>>>>>>> We've had a skype call today and I will try to summarize
some of
>>>>>>>>> the
>>>>>>>>>
>>>>>>>>> key
>>>>>>>>
>>>>>>>
>>>>>> points here. The main use-cases we see are the following:
>>>>>>>
>>>>>>>>
>>>>>>>>> - Use Flink for ETL / creating the graph and then insert
it to a
>>>>>>>>> graph
>>>>>>>>> database, like neo4j, for querying and search.
>>>>>>>>> - Query neo4j on some topic or search the graph for patterns
and
>>>>>>>>>
>>>>>>>>> extract a
>>>>>>>>
>>>>>>>
>>>>>> subgraph, on which we'd then like to run some iterative graph
>>>>>>>
>>>>>>>> analysis
>>>>>>>>> task. This is where Flink/Gelly can help, by complementing
the
>>>>>>>>> querying
>>>>>>>>> (neo4j) with efficient iterative computation.
>>>>>>>>>
>>>>>>>>> We all agreed that the main challenge is efficiently
getting the
>>>>>>>>> data
>>>>>>>>>
>>>>>>>>> out
>>>>>>>>
>>>>>>>
>>>>>> of neo4j and into Flink. There have been some attempts to do similar
>>>>>>>
>>>>>>>> things
>>>>>>>>> with neo4j and Spark, but the performance results are
not very
>>>>>>>>>
>>>>>>>>> promising:
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>> - Mazerunner [2] is using HDFS for communication. We think that's
>>>>>>>>> it's
>>>>>>>>>
>>>>>>>>> not
>>>>>>>>
>>>>>>>
>>>>>> worth it going towards this direction, as dumping the neo4j
>>>>>>>
>>>>>>>> database to
>>>>>>>>> HDFS and then reading it back to Flink would probably
be terribly
>>>>>>>>> slow.
>>>>>>>>> - In [3], you can see Michael Hunger's findings on using
neo's HTTP
>>>>>>>>> interface to import data into Spark, run PageRank and
then put data
>>>>>>>>>
>>>>>>>>> back
>>>>>>>>
>>>>>>>
>>>>>> into neo4j. It seems that this took > 2h for a 125m edge graph.
The
>>>>>>>
>>>>>>>>
>>>>>>>>> main
>>>>>>>>
>>>>>>>
>>>>>> bottlenecks appear to be (1) reading the data as an RDD => this
>>>>>>>
>>>>>>>> had to
>>>>>>>>>
>>>>>>>>> be
>>>>>>>>
>>>>>>>
>>>>>> performed into small batches to avoid OOM errors and (2) PageRank
>>>>>>>
>>>>>>>> computation itself, which seems weird to me.
>>>>>>>>>
>>>>>>>>> We decided to experiment with neo4j HTTP and Flink and
we'll report
>>>>>>>>>
>>>>>>>>> back
>>>>>>>>
>>>>>>>
>>>>>> when we have some results.
>>>>>>>
>>>>>>>>
>>>>>>>>> In the meantime, if you have any ideas on how we could
speed up
>>>>>>>>> reading
>>>>>>>>> from neo4j or any suggestion on approaches that I haven't
>>>>>>>>> mentioned,
>>>>>>>>> please
>>>>>>>>> feel free to reply to this e-mail or add your comment
in the shared
>>>>>>>>> document.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> -Vasia.
>>>>>>>>>
>>>>>>>>> [1]:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
>>>>>>
>>>>>> [2]: https://github.com/kbastani/neo4j-mazerunner
>>>>>>
>>>>>>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message