flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Re: Re: Flink Gelly
Date Fri, 07 Oct 2016 11:31:05 GMT
Hi,

the exception
> java.lang.RuntimeException: Memory ran out. Compaction failed.

says that the hash table ran out of memory. Gelly is implemented on top of
Flink's DataSet API.
So this would rather be a problem with DataSet than Gelly.

I think Vasia is right about the memory configuration. Providing more
memory usually is the solution to this problem.
Another thing you can do is to try a hashtable in JVM memory instead of
managed memory.
See the options to set the solution set to unmanaged memory in the docs [1]

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/libs/gelly.html#iterative-graph-processing

2016-10-07 13:21 GMT+02:00 <dfki@web.de>:

>
>
> Hi again,
>
> I tried to change some configs and set the available amount memory to
> 4096mb
> but there is no difference at all. Furthermore I monitored the usage of of
> RAM by
> the JVM and it does not go beyond 3gb at all.
>
> I still don't think that the memory is the propblem. For me it looks like
> an internal
> problem with Gelly.
>
> Best,
> Dennis
>
>
> Gesendet: Freitag, 07. Oktober 2016 um 12:29 Uhr
> Von: "Vasiliki Kalavri" <vasilikikalavri@gmail.com>
> An: dev@flink.apache.org
> Betreff: Re: Re: Flink Gelly
> Hi Dennis,
>
> On 7 October 2016 at 11:29, <dfki@web.de> wrote:
>
> > Hi Vasia,
> >
> > thanks for your reply.
> >
> > Currently I am testing it on my normal workstation (16GB Ram) but I also
> > tried it on out cluster.
> > Both are failing at the same amount of nodes, so I guess it has something
> > to do with Gelly
> > or with the properties.
> >
> > The configured memory is default. I did not change it because I thought
> > that flink is not the problem
> > but I might be wrong.
> >
>
> ​The default configured​ memory is only 512MB. If you have 16GB but you
> don't let Flink know about it, it won't use it.
> I see that your vertex values are Tuple2's of HashMap's of Lists and I
> suspect these grow big.
> If you're running the program from your IDE make sure to add a VM option.
> If you're running the program from the command line, make sure to edit the
> flink-conf.yaml file. You can find the available configuration options and
> what they mean in the docs [1].
>
> ​I hope this helps,
> -Vasia.​
>
> ​[1]:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/setup/config.html
> ​
>
>
>
> >
> > The Input should not be much... I wrote an API for Virtuoso which is
> > requesting a RDF-graph. But
> > I limited it to 10 Data Sets only.
> >
> > This is my code, it is a bit messy and their might be improvement:
> >
> >
> > public static final class PathMessageFunction
> > extends
> > ScatterFunction<String, Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>, List<String>, String> {
> > @Override
> > public void sendMessages(
> > Vertex<String, Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>> vertex)
> > throws Exception {
> >
> > // The list "path" collects the ID's of the verticies a
> > message was send to.
> >
> > List<String> path = new ArrayList<String>();
> > if (super.getSuperstepNumber() == 1) {
> > path.add(vertex.getId());
> > }
> > if (super.getSuperstepNumber() > 1) {
> > for (String values : vertex.f1.f1.get(super.getSuperstepNumber()
> > - 1)) {
> > path.add(values + ";" + vertex.getId());
> > }
> > }
> >
> > // The Path-List is send to the next neighbouring Nodes.
> >
> > for (Edge<String, String> edge : getEdges()) {
> > sendMessageTo(edge.getTarget(), path);
> > }
> > }
> > }
> >
> >
> >
> > public static final class PathUpdateFunction
> > extends
> > GatherFunction<String, Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>, List<String>> {
> > @Override
> > public void updateVertex(
> > Vertex<String, Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>> vertex,
> > MessageIterator<List<String>> messenger)
> > throws Exception {
> >
> > List<String> newValues = new ArrayList<String>();
> >
> > // The Path-List which was send as a message is also stored
> > within the vertex value, therefore the Paths are saved to a new List
> > "newValues".
> > // This List should not contain the ID of the vertex itself to
> > avoid cycles.
> >
> > for (List<String> msg : messenger) {
> > for (String value : msg) {
> > if (!value.contains(vertex.getId())) {
> > newValues.add(value);
> > }
> > }
> > }
> >
> > // Creation of a new HashMap with the new and old values for
> > the setNewVertexValue function
> >
> > HashMap<Integer, List<String>> newHashMap = vertex.f1.f1;
> > newHashMap.put(super.getSuperstepNumber(), newValues);
> >
> >
> > HashMap<String, List<String>> multiPaths = new HashMap<String,
> > List<String>>();
> >
> >
> > // Here it gets a bit complicated... However... I try to
> > analyze the given paths for possible combinations of them.
> > // For example... I got the path "a;b;c" and the patch
> > "c;d;e", so I predict that "a;b;c;d;e" should also be possible.
> >
> > for (int i = 0; i < oriList.size(); i++) {
> >
> > String oriTemp = oriList.get(i);
> > String destTemp = destList.get(i);
> >
> > String oriDest = oriTemp + destTemp;
> >
> > List<String> tempList = new ArrayList<String>();
> > List<String> setsWithOrigin = new ArrayList<String>();
> > List<String> setsWithDestination = new ArrayList<String>();
> > for (Entry<Integer, List<String>> entry :
> > newHashMap.entrySet()) {
> > for (String value : entry.getValue()) {
> > if (value.contains(oriTemp)) {
> > setsWithOrigin.add(value);
> > }
> > if (value.contains(destTemp)) {
> > setsWithDestination.add(value);
> > }
> > }
> > }
> > for (String originIter : setsWithOrigin) {
> > for (String destinationIter : setsWithDestination) {
> > String concat = "";
> > if ((originIter.indexOf(oriTemp) == 0 &&
> > destinationIter
> > .indexOf(destTemp) == 0)) {
> > String reverse = destinationIter;
> > if (destinationIter.length() > 1) {
> > reverse = "";
> > int d = destinationIter.length();
> > for (int a = 0; a <
> > destinationIter.length(); a++) {
> > reverse = reverse
> > + destinationIter.substring(d
> > - 1,
> > d);
> > d--;
> > }
> > }
> > concat = originIter + ";" + vertex.getId() +
> > ";"
> > + reverse;
> > }
> > if (isFormatValid(concat) && concat.length() > 0) {
> > if (!tempList.contains(concat)) {
> > tempList.add(concat);
> > }
> > }
> > }
> > }
> > multiPaths.put(oriDest, tempList);
> > }
> >
> >
> > // The combined paths are also saved into a HashMap which is
> > additionally set as a Vertex Value
> > // Later the paths are filtered for redundance
> >
> > Tuple2<HashMap<String, List<String>>, HashMap<Integer,
> > List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>(
> > multiPaths, newHashMap);
> >
> >
> > setNewVertexValue(testTuple3);
> > }
> > }
> >
> >
> > Let me know if you need any further information.
> > Thanks in advance.
> >
> > All the best,
> > Dennis
> >
> >
> > Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr
> > Von: "Vasiliki Kalavri" <vasilikikalavri@gmail.com>
> > An: dev@flink.apache.org
> > Betreff: Re: Flink Gelly
> > Hi Dennis,
> >
> > can you give us some details about your setup? e.g. where you are running
> > your job, your input size, the configured memory, etc. It would also be
> > helpful if you could share your code. Getting an out of memory error with
> > just 100 nodes seems weird.
> >
> > Best,
> > -Vasia.
> >
> > On 6 October 2016 at 13:29, <dfki@web.de> wrote:
> >
> > >
> > > Dear ladies and gentlemen,
> > >
> > > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso
> > > Graph into
> > > Flink's Gelly and I want to analyze it for the different paths one can
> > > take to link
> > > the different nodes. Therefore I am using the ScatterGatherIteration.
> > > However, my code just works with about ten to twenty nodes. When I try
> to
> > > upload
> > > a hundred nodes, the following error occurs:
> > >
> > > Exception in thread "main" org.apache.flink.runtime.
> > > client.JobExecutionException: Job execution failed.
> > > at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> > > mcV$sp(JobManager.scala:822)
> > > at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> > JobManager.scala:768)
> > > at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> > JobManager.scala:768)
> > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > > liftedTree1$1(Future.scala:24)
> > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> > > Future.scala:24)
> > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> > > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > > AbstractDispatcher.scala:401)
> > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
> > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > > runTask(ForkJoinPool.java:1339)
> > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > > ForkJoinPool.java:1979)
> > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > > ForkJoinWorkerThread.java:107)
> > > Caused by: java.lang.RuntimeException: Memory ran out. Compaction
> failed.
> > > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow
> > > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory:
> > > 33685504 Message: null
> > > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > > insertRecordIntoPartition(CompactingHashTable.java:457)
> > > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > > insertOrReplaceRecord(CompactingHashTable.java:392)
> > > at org.apache.flink.runtime.iterative.io.
> SolutionSetUpdateOutputCollect
> > > or.collect(SolutionSetUpdateOutputCollector.java:54)
> > > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue(
> > > GatherFunction.java:123)
> > > at org.apache.flink.quickstart.PathRank$PathUpdateFunction.
> > > updateVertex(PathRank.java:357)
> > > at org.apache.flink.graph.spargel.ScatterGatherIteration$
> > > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> > > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr
> > > iver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> > > at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:486)
> > > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> > > AbstractIterativeTask.java:146)
> > > at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> > > IterationTailTask.java:107)
> > > at org.apache.flink.runtime.operators.BatchTask.invoke(
> > BatchTask.java:351)
> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> > > at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > I tried to google it a bit, and this problems seems to occur often when
> > > using Gelly. I hope you have any ideas or approaches how I can handle
> > this
> > > error.
> > >
> > > Thank you in advance!
> > > All the best,
> > > Dennis
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message