flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Martin Junghanns <m.jungha...@mailbox.org>
Subject Re: neo4j - Flink connector
Date Sat, 07 Nov 2015 16:28:08 GMT
Hi Robert,

Thank you for the hints. I tried to narrow down the error:

Flink version: 0.10-SNAPSHOT
Neo4j version: 2.3.0

I start with two dependencies:
flink-java
flink-gelly

(1) Add neo4j-harness and run basic example from Neo4j [1]
Leads to:

java.lang.ClassNotFoundException: org.eclipse.jetty.server.ConnectionFactory

(2) I excluded jetty-server from flink-java and flink-gelly
It now uses jetty-server:9.2.4.v20141103 (was 8.0.0.M1)
Leads to:

leads to: java.lang.NoSuchMethodError: 
org.eclipse.jetty.servlet.ServletContextHandler.<init>

(3) I excluded jetty-servlet from flink-java and flink-gelly
It now uses jetty-servlet:9.2.4.v20141103 (was 8.0.0.M1)
Leads to:

java.lang.NoSuchMethodError: scala.Predef$.$conforms()

(4) I excluded scala-library from flink-java and flink-gelly
It now uses scala-library:2.11.7 (was 2.10.4)

Now, the basic Neo4j example (without Flink runs).

Next, I added Flink to the mix and wrote a simple test using 
neo4j-harness features, ExecutionEnvironment and my InputFormat.
Leads to:

java.lang.NoSuchMethodError: 
scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
	at akka.actor.ActorCell$.<init>(ActorCell.scala:336)
	at akka.actor.ActorCell$.<clinit>(ActorCell.scala)
	at akka.actor.RootActorPath.$div(ActorPath.scala:159)
	at akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:464)
	at akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:452)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
	at 
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
	at scala.util.Try$.apply(Try.scala:192)
	at 
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
	at 
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
	at 
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
	at scala.util.Success.flatMap(Try.scala:231)
	at 
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
	at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
	at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
	at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
	at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
	at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
	at 
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84)
	at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:203)
	at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:232)
	at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.org$apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:232)
	at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:237)
	at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:235)
	at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
	at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
	at scala.collection.immutable.Range.foreach(Range.scala:166)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:235)
	at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:226)
	at org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:115)
	at 
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
	at 
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:87)
	at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821)
	at 
org.apache.flink.api.java.io.neo4j.Neo4jInputTest.inputFormatTest(Neo4jInputTest.java:109)

This is where I don't know what to exclude next. Seems that some 
components (akka?) need scala 2.10.4 and Neo4j (cypher) depends on scala 
2.11.7.

How can I make use of the maven shade plugin in that case?

Again, thank you!

Cheers,
Martin

[1] 
http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html 
(testMyExtensionWithFunctionFixture())


On 06.11.2015 16:17, Robert Metzger wrote:
> Hi Martin,
>
> what exactly were the issues you were facing with the dependency conflicts?
>
> There is a way around these issues, and its called shading:
> https://maven.apache.org/plugins/maven-shade-plugin/
> In Flink we have several shaded modules (curator, hadoop) .. we could add a
> neo4j-harness-shaded module which relocates conflicting dependencies into a
> different namespace. That way, you can execute different versions of the
> same library (jetty, scala) at the same time.
> Since the module contains module would only contain dependencies needed at
> test time, we could exclude it from releases.
>
> Regarding Scala, it would be fine to execute the neo4j tests only with
> scala 2.11 builds. Its not hard to control this using maven build profiles.
> Do you really need jetty? Is neo4j starting the web interface also for the
> tests?
>
> Regards,
> Robert
>
>
> On Fri, Nov 6, 2015 at 4:09 PM, Martin Junghanns <m.junghanns@mailbox.org>
> wrote:
>
>> Hi,
>>
>> I could need your input on testing the input format with Flink.
>>
>> As I already mentioned, Neo4j offers a dedicated module (neo4j-harness)
>> for unit testing server extensions / REST applications. The problem here
>> is that the dependencies of Flink conflict with the dependencies of
>> neo4j-harness (e.g. jetty, scala-library). I tried to figure out what
>> combination could run using the maven exclude mechanism, but no success.
>>
>> So I thought about workarounds:
>>
>> (1) Michael Hunger (neo4j) started a project and invited me to
>> contribute [1]. What it does during tests is:
>> - download a neo4j-<version>.tar.gz into a temp folder
>> - extract and start a neo4j instance
>> - run tests
>> - stop and discard neo4j
>>
>> I like the concept, but I guess the problem is that it runs outside of
>> maven and I guess downloading from external resources (especially in
>> travis-ci) could lead to problems.
>>
>> (2) I had a look into the other input formats. flink-hbase uses examples
>> instead of unit tests. This could be an option as long as there is no
>> clean solution for "real" unit testing.
>>
>> What do you think?
>>
>> Cheers, Martin
>>
>>
>> [1] https://github.com/jexp/neo4j-starter
>>
>>
>> On 03.11.2015 01:18, Stephan Ewen wrote:
>>> Wow, very nice results :-)
>>>
>>> This input format alone is probably a very useful contribution, so I
>> would
>>> open a contribution there once you manage to get a few tests running.
>>>
>>> I know little about neo4j, is there a way to read cypher query results in
>>> parallel? (most systems do not expose such an interface, but maybe neo4j
>> is
>>> special there).
>>>
>>> I recall at some point in time Martin Neumann asking about a way to
>> create
>>> dense contiguous unique IDs for creating graphs that can be bulk-imported
>>> into neo4j. There is code for that in the data set utils, this may be
>>> valuable for an output format.
>>>
>>> On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns <
>> m.junghanns@mailbox.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I wanted to give you a little update. I created a non-parallel
>>>> InputFormat which reads Cypher results from Neo4j into Tuples [1].
>>>> It can be used like the JDBCInputFormat:
>>>>
>>>> String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)";
>>>>
>>>> Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput =
>>>> Neo4jInputFormat.buildNeo4jInputFormat()
>>>>        .setRestURI(restURI)
>>>>        .setCypherQuery(q)
>>>>        .setUsername("neo4j")
>>>>        .setPassword("test")
>>>>        .setConnectTimeout(1000)
>>>>        .setReadTimeout(1000)
>>>>        .finish();
>>>>
>>>> Atm, to run the tests, a Neo4j instance needs to be up and running.
>>>> I tried to get neo4j-harness [2] into the project, but there are some
>>>> dependency conflicts which I need to figure out.
>>>>
>>>> I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j
>>>> running on the same machine. My dataset is the polish wiki dump [2]
>>>> which consists of 430,602 pages and 2,727,302 links. The protocol is:
>>>>
>>>> 1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer>
>>>> 2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge value
>>>> 3) Create Gelly graph from Tuple3, init vertex values to 1.0
>>>> 4) Run PageRank with beta=0.85 and 5 iterations
>>>>
>>>> This takes about 22 seconds on my machine which is very promising.
>>>>
>>>> Next steps are:
>>>> - OutputFormat
>>>> - Better Unit tests (integrate neo4j-harness)
>>>> - Bigger graphs :)
>>>>
>>>> Any ideas and suggestions are of course highly appreciated :)
>>>>
>>>> Best,
>>>> Martin
>>>>
>>>>
>>>> [1] https://github.com/s1ck/flink-neo4j
>>>> [2]
>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>>>> [3] plwiktionary-20151002-pages-articles-multistream.xml.bz2
>>>>
>>>>
>>>>
>>>> On 29.10.2015 14:51, Vasiliki Kalavri wrote:
>>>>
>>>>> Hello everyone,
>>>>>
>>>>> Martin, Martin, Alex (cc'ed) and myself have started discussing about
>>>>> implementing a neo4j-Flink connector. I've opened a corresponding JIRA
>>>>> (FLINK-2941) containing an initial document [1], but we'd also like to
>>>>> share our ideas here to engage the community and get your feedback.
>>>>>
>>>>> We've had a skype call today and I will try to summarize some of the
>> key
>>>>> points here. The main use-cases we see are the following:
>>>>>
>>>>> - Use Flink for ETL / creating the graph and then insert it to a graph
>>>>> database, like neo4j, for querying and search.
>>>>> - Query neo4j on some topic or search the graph for patterns and
>> extract a
>>>>> subgraph, on which we'd then like to run some iterative graph analysis
>>>>> task. This is where Flink/Gelly can help, by complementing the querying
>>>>> (neo4j) with efficient iterative computation.
>>>>>
>>>>> We all agreed that the main challenge is efficiently getting the data
>> out
>>>>> of neo4j and into Flink. There have been some attempts to do similar
>>>>> things
>>>>> with neo4j and Spark, but the performance results are not very
>> promising:
>>>>>
>>>>> - Mazerunner [2] is using HDFS for communication. We think that's it's
>> not
>>>>> worth it going towards this direction, as dumping the neo4j database
to
>>>>> HDFS and then reading it back to Flink would probably be terribly slow.
>>>>> - In [3], you can see Michael Hunger's findings on using neo's HTTP
>>>>> interface to import data into Spark, run PageRank and then put data
>> back
>>>>> into neo4j. It seems that this took > 2h for a 125m edge graph. The
>> main
>>>>> bottlenecks appear to be (1) reading the data as an RDD => this had
to
>> be
>>>>> performed into small batches to avoid OOM errors and (2) PageRank
>>>>> computation itself, which seems weird to me.
>>>>>
>>>>> We decided to experiment with neo4j HTTP and Flink and we'll report
>> back
>>>>> when we have some results.
>>>>>
>>>>> In the meantime, if you have any ideas on how we could speed up reading
>>>>> from neo4j or any suggestion on approaches that I haven't mentioned,
>>>>> please
>>>>> feel free to reply to this e-mail or add your comment in the shared
>>>>> document.
>>>>>
>>>>> Cheers,
>>>>> -Vasia.
>>>>>
>>>>> [1]:
>>>>>
>>>>>
>> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
>>>>> [2]: https://github.com/kbastani/neo4j-mazerunner
>>>>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804
>>>>>
>>>>>
>>>
>>
>

Mime
View raw message