Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5F3FB188A9 for ; Thu, 12 Nov 2015 11:51:44 +0000 (UTC) Received: (qmail 5722 invoked by uid 500); 12 Nov 2015 11:51:44 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 5648 invoked by uid 500); 12 Nov 2015 11:51:44 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 5636 invoked by uid 99); 12 Nov 2015 11:51:44 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Nov 2015 11:51:44 +0000 Received: from mail-lf0-f48.google.com (mail-lf0-f48.google.com [209.85.215.48]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 2A2A11A0181 for ; Thu, 12 Nov 2015 11:51:43 +0000 (UTC) Received: by lfs39 with SMTP id 39so32460447lfs.3 for ; Thu, 12 Nov 2015 03:51:41 -0800 (PST) X-Received: by 10.25.146.10 with SMTP id u10mr6912488lfd.50.1447329101719; Thu, 12 Nov 2015 03:51:41 -0800 (PST) MIME-Version: 1.0 Received: by 10.112.173.196 with HTTP; Thu, 12 Nov 2015 03:51:22 -0800 (PST) In-Reply-To: <56438B0A.8090502@mailbox.org> References: <5634F181.1010309@mailbox.org> <563CC2BA.4010501@mailbox.org> <563E2698.2000101@mailbox.org> <56438B0A.8090502@mailbox.org> From: Robert Metzger Date: Thu, 12 Nov 2015 12:51:22 +0100 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: neo4j - Flink connector To: "dev@flink.apache.org" Content-Type: multipart/alternative; boundary=001a11403218fb327205245690eb --001a11403218fb327205245690eb Content-Type: text/plain; charset=UTF-8 Sorry for the delay. So the plan of this work is to add a neo4j connector into Flink, right? While looking at the pom files of neo4j I found that its GPLv3 licensed, and Apache projects can not depend/link with GPL code [1]. So I we can not make the module part of the Flink source. However, its actually quite easy to publish code into Maven central, so you could release it on your own into Maven. If that is too much work for you, I can also start a github project like "flink-gpl" with access to maven central where we can release stuff like this. Is this repository [2] your current work in progress on the dependency issue? Maybe the neo4j dependency expects scala 2.11 and there is no scala 2.10 build out. In this case, we could require Flink users to use the scala 2.11 build of Flink when they want to use neo4j. I think I can help you much better as soon as I have your current pom file + code. [1] http://www.apache.org/legal/resolved.html#category-a [2] https://github.com/s1ck/flink-neo4j On Wed, Nov 11, 2015 at 7:38 PM, Martin Junghanns wrote: > Hi, > > I am a bit stuck with that dependency problem. Any help would be > appreciated as I would like to continue working on the formats. Thanks! > > Best, > Martin > > > On 07.11.2015 17:28, Martin Junghanns wrote: > >> Hi Robert, >> >> Thank you for the hints. I tried to narrow down the error: >> >> Flink version: 0.10-SNAPSHOT >> Neo4j version: 2.3.0 >> >> I start with two dependencies: >> flink-java >> flink-gelly >> >> (1) Add neo4j-harness and run basic example from Neo4j [1] >> Leads to: >> >> java.lang.ClassNotFoundException: >> org.eclipse.jetty.server.ConnectionFactory >> >> (2) I excluded jetty-server from flink-java and flink-gelly >> It now uses jetty-server:9.2.4.v20141103 (was 8.0.0.M1) >> Leads to: >> >> leads to: java.lang.NoSuchMethodError: >> org.eclipse.jetty.servlet.ServletContextHandler. >> >> (3) I excluded jetty-servlet from flink-java and flink-gelly >> It now uses jetty-servlet:9.2.4.v20141103 (was 8.0.0.M1) >> Leads to: >> >> java.lang.NoSuchMethodError: scala.Predef$.$conforms() >> >> (4) I excluded scala-library from flink-java and flink-gelly >> It now uses scala-library:2.11.7 (was 2.10.4) >> >> Now, the basic Neo4j example (without Flink runs). >> >> Next, I added Flink to the mix and wrote a simple test using >> neo4j-harness features, ExecutionEnvironment and my InputFormat. >> Leads to: >> >> java.lang.NoSuchMethodError: >> >> scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet; >> >> at akka.actor.ActorCell$.(ActorCell.scala:336) >> at akka.actor.ActorCell$.(ActorCell.scala) >> at akka.actor.RootActorPath.$div(ActorPath.scala:159) >> at >> akka.actor.LocalActorRefProvider.(ActorRefProvider.scala:464) >> at >> akka.actor.LocalActorRefProvider.(ActorRefProvider.scala:452) >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> >> at >> >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:422) >> at >> >> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) >> >> at scala.util.Try$.apply(Try.scala:192) >> at >> >> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) >> >> at >> >> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) >> >> at >> >> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) >> >> at scala.util.Success.flatMap(Try.scala:231) >> at >> >> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84) >> >> at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585) >> at akka.actor.ActorSystemImpl.(ActorSystem.scala:578) >> at akka.actor.ActorSystem$.apply(ActorSystem.scala:142) >> at akka.actor.ActorSystem$.apply(ActorSystem.scala:119) >> at akka.actor.ActorSystem$.create(ActorSystem.scala:67) >> at >> >> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84) >> >> at >> >> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:203) >> >> at >> >> org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:232) >> >> at >> org.apache.flink.runtime.minicluster.FlinkMiniCluster.org >> $apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:232) >> >> at >> >> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:237) >> >> at >> >> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:235) >> >> at >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) >> >> at >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) >> >> at scala.collection.immutable.Range.foreach(Range.scala:166) >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:245) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at >> >> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:235) >> >> at >> >> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:226) >> >> at >> org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:115) >> at >> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173) >> at >> >> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:87) >> >> at >> >> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821) >> >> at >> org.apache.flink.api.java.io >> .neo4j.Neo4jInputTest.inputFormatTest(Neo4jInputTest.java:109) >> >> >> This is where I don't know what to exclude next. Seems that some >> components (akka?) need scala 2.10.4 and Neo4j (cypher) depends on scala >> 2.11.7. >> >> How can I make use of the maven shade plugin in that case? >> >> Again, thank you! >> >> Cheers, >> Martin >> >> [1] >> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html >> (testMyExtensionWithFunctionFixture()) >> >> >> On 06.11.2015 16:17, Robert Metzger wrote: >> >>> Hi Martin, >>> >>> what exactly were the issues you were facing with the dependency >>> conflicts? >>> >>> There is a way around these issues, and its called shading: >>> https://maven.apache.org/plugins/maven-shade-plugin/ >>> In Flink we have several shaded modules (curator, hadoop) .. we could >>> add a >>> neo4j-harness-shaded module which relocates conflicting dependencies >>> into a >>> different namespace. That way, you can execute different versions of the >>> same library (jetty, scala) at the same time. >>> Since the module contains module would only contain dependencies >>> needed at >>> test time, we could exclude it from releases. >>> >>> Regarding Scala, it would be fine to execute the neo4j tests only with >>> scala 2.11 builds. Its not hard to control this using maven build >>> profiles. >>> Do you really need jetty? Is neo4j starting the web interface also for >>> the >>> tests? >>> >>> Regards, >>> Robert >>> >>> >>> On Fri, Nov 6, 2015 at 4:09 PM, Martin Junghanns >>> >>> wrote: >>> >>> Hi, >>>> >>>> I could need your input on testing the input format with Flink. >>>> >>>> As I already mentioned, Neo4j offers a dedicated module (neo4j-harness) >>>> for unit testing server extensions / REST applications. The problem here >>>> is that the dependencies of Flink conflict with the dependencies of >>>> neo4j-harness (e.g. jetty, scala-library). I tried to figure out what >>>> combination could run using the maven exclude mechanism, but no success. >>>> >>>> So I thought about workarounds: >>>> >>>> (1) Michael Hunger (neo4j) started a project and invited me to >>>> contribute [1]. What it does during tests is: >>>> - download a neo4j-.tar.gz into a temp folder >>>> - extract and start a neo4j instance >>>> - run tests >>>> - stop and discard neo4j >>>> >>>> I like the concept, but I guess the problem is that it runs outside of >>>> maven and I guess downloading from external resources (especially in >>>> travis-ci) could lead to problems. >>>> >>>> (2) I had a look into the other input formats. flink-hbase uses examples >>>> instead of unit tests. This could be an option as long as there is no >>>> clean solution for "real" unit testing. >>>> >>>> What do you think? >>>> >>>> Cheers, Martin >>>> >>>> >>>> [1] https://github.com/jexp/neo4j-starter >>>> >>>> >>>> On 03.11.2015 01:18, Stephan Ewen wrote: >>>> >>>>> Wow, very nice results :-) >>>>> >>>>> This input format alone is probably a very useful contribution, so I >>>>> >>>> would >>>> >>>>> open a contribution there once you manage to get a few tests running. >>>>> >>>>> I know little about neo4j, is there a way to read cypher query >>>>> results in >>>>> parallel? (most systems do not expose such an interface, but maybe >>>>> neo4j >>>>> >>>> is >>>> >>>>> special there). >>>>> >>>>> I recall at some point in time Martin Neumann asking about a way to >>>>> >>>> create >>>> >>>>> dense contiguous unique IDs for creating graphs that can be >>>>> bulk-imported >>>>> into neo4j. There is code for that in the data set utils, this may be >>>>> valuable for an output format. >>>>> >>>>> On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns < >>>>> >>>> m.junghanns@mailbox.org> >>>> >>>>> wrote: >>>>> >>>>> Hi, >>>>>> >>>>>> I wanted to give you a little update. I created a non-parallel >>>>>> InputFormat which reads Cypher results from Neo4j into Tuples [1]. >>>>>> It can be used like the JDBCInputFormat: >>>>>> >>>>>> String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)"; >>>>>> >>>>>> Neo4jInputFormat> neoInput = >>>>>> Neo4jInputFormat.buildNeo4jInputFormat() >>>>>> .setRestURI(restURI) >>>>>> .setCypherQuery(q) >>>>>> .setUsername("neo4j") >>>>>> .setPassword("test") >>>>>> .setConnectTimeout(1000) >>>>>> .setReadTimeout(1000) >>>>>> .finish(); >>>>>> >>>>>> Atm, to run the tests, a Neo4j instance needs to be up and running. >>>>>> I tried to get neo4j-harness [2] into the project, but there are some >>>>>> dependency conflicts which I need to figure out. >>>>>> >>>>>> I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j >>>>>> running on the same machine. My dataset is the polish wiki dump [2] >>>>>> which consists of 430,602 pages and 2,727,302 links. The protocol is: >>>>>> >>>>>> 1) Read edge ids from cold Neo4j into Tuple2 >>>>>> 2) Convert Tuple2 into Tuple3 for edge value >>>>>> 3) Create Gelly graph from Tuple3, init vertex values to 1.0 >>>>>> 4) Run PageRank with beta=0.85 and 5 iterations >>>>>> >>>>>> This takes about 22 seconds on my machine which is very promising. >>>>>> >>>>>> Next steps are: >>>>>> - OutputFormat >>>>>> - Better Unit tests (integrate neo4j-harness) >>>>>> - Bigger graphs :) >>>>>> >>>>>> Any ideas and suggestions are of course highly appreciated :) >>>>>> >>>>>> Best, >>>>>> Martin >>>>>> >>>>>> >>>>>> [1] https://github.com/s1ck/flink-neo4j >>>>>> [2] >>>>>> >>>>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html >>>> >>>>> [3] plwiktionary-20151002-pages-articles-multistream.xml.bz2 >>>>>> >>>>>> >>>>>> >>>>>> On 29.10.2015 14:51, Vasiliki Kalavri wrote: >>>>>> >>>>>> Hello everyone, >>>>>>> >>>>>>> Martin, Martin, Alex (cc'ed) and myself have started discussing about >>>>>>> implementing a neo4j-Flink connector. I've opened a corresponding >>>>>>> JIRA >>>>>>> (FLINK-2941) containing an initial document [1], but we'd also >>>>>>> like to >>>>>>> share our ideas here to engage the community and get your feedback. >>>>>>> >>>>>>> We've had a skype call today and I will try to summarize some of the >>>>>>> >>>>>> key >>>> >>>>> points here. The main use-cases we see are the following: >>>>>>> >>>>>>> - Use Flink for ETL / creating the graph and then insert it to a >>>>>>> graph >>>>>>> database, like neo4j, for querying and search. >>>>>>> - Query neo4j on some topic or search the graph for patterns and >>>>>>> >>>>>> extract a >>>> >>>>> subgraph, on which we'd then like to run some iterative graph >>>>>>> analysis >>>>>>> task. This is where Flink/Gelly can help, by complementing the >>>>>>> querying >>>>>>> (neo4j) with efficient iterative computation. >>>>>>> >>>>>>> We all agreed that the main challenge is efficiently getting the data >>>>>>> >>>>>> out >>>> >>>>> of neo4j and into Flink. There have been some attempts to do similar >>>>>>> things >>>>>>> with neo4j and Spark, but the performance results are not very >>>>>>> >>>>>> promising: >>>> >>>>> >>>>>>> - Mazerunner [2] is using HDFS for communication. We think that's >>>>>>> it's >>>>>>> >>>>>> not >>>> >>>>> worth it going towards this direction, as dumping the neo4j >>>>>>> database to >>>>>>> HDFS and then reading it back to Flink would probably be terribly >>>>>>> slow. >>>>>>> - In [3], you can see Michael Hunger's findings on using neo's HTTP >>>>>>> interface to import data into Spark, run PageRank and then put data >>>>>>> >>>>>> back >>>> >>>>> into neo4j. It seems that this took > 2h for a 125m edge graph. The >>>>>>> >>>>>> main >>>> >>>>> bottlenecks appear to be (1) reading the data as an RDD => this >>>>>>> had to >>>>>>> >>>>>> be >>>> >>>>> performed into small batches to avoid OOM errors and (2) PageRank >>>>>>> computation itself, which seems weird to me. >>>>>>> >>>>>>> We decided to experiment with neo4j HTTP and Flink and we'll report >>>>>>> >>>>>> back >>>> >>>>> when we have some results. >>>>>>> >>>>>>> In the meantime, if you have any ideas on how we could speed up >>>>>>> reading >>>>>>> from neo4j or any suggestion on approaches that I haven't mentioned, >>>>>>> please >>>>>>> feel free to reply to this e-mail or add your comment in the shared >>>>>>> document. >>>>>>> >>>>>>> Cheers, >>>>>>> -Vasia. >>>>>>> >>>>>>> [1]: >>>>>>> >>>>>>> >>>>>>> >>>> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing >>>> >>>> [2]: https://github.com/kbastani/neo4j-mazerunner >>>>>>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804 >>>>>>> >>>>>>> >>>>>>> >>>>> >>>> >>> --001a11403218fb327205245690eb--