Hi Shivani,
Flink doesn't have enough memory to perform a hash join. You need to
provide Flink with more memory. You can either increase the
"taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction"
to some value greater than 0.7 and smaller then 1.0. The first config
variable allocates more overall memory for Flink; the latter changes the
ratio between Flink managed memory (e.g. for hash join) and user memory
(for you functions and Gelly's code).
If you run this inside an IDE, the memory is configured automatically and
you don't have control over that at the moment. You could, however, start a
local cluster (./bin/startlocal) after you adjusted your flinkconf.yaml
and run your programs against that configured cluster. You can do that
either through your IDE using a RemoteEnvironment or by submitting the
packaged JAR to the local cluster using the commandline tool (./bin/flink).
Hope that helps.
Cheers,
Max
On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <shghatge@gmail.com> wrote:
> Hello,
> I am working on a problem which implements Adamic Adar Algorithm using
> Gelly.
> I am running into this exception for all the Joins (including the one that
> are part of the reduceOnNeighbors function)
>
> Too few memory segments provided. Hash Join needs at least 33 memory
> segments.
>
>
> The problem persists even when I comment out some of the joins.
>
> Even after using edg = edg.join(graph.getEdges(),
> JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new
> JoinEdge());
>
> as suggested by @AndraLungu the problem persists.
>
> The code is
>
>
> DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();
>
> //get neighbors of each vertex in the HashSet for it's value
> computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(),
> EdgeDirection.ALL);
>
> //get vertices with updated values for the final Graph which will
> be used to get Adamic Edges
> Vertices = computedNeighbors.join(degrees,
> JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new
> JoinNeighborDegrees());
>
> Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long,
Long,
> Double>>>, Double> updatedGraph =
> Graph.fromDataSet(Vertices, edges, env);
>
> //configure Vertex Centric Iteration
> VertexCentricConfiguration parameters = new
> VertexCentricConfiguration();
>
> parameters.setName("Find Adamic Adar Edge Weights");
>
> parameters.setDirection(EdgeDirection.ALL);
>
> //run Vertex Centric Iteration to get the Adamic Adar Edges into
> the vertex Value
> updatedGraph = updatedGraph.runVertexCentricIteration(new
> GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
>
> //Extract Vertices of the updated graph
> DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>,
> List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
>
> //Extract the list of Edges from the vertex values
> DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new
> GetAdamicList());
>
> //Partial weights for the edges are added
> edg = edg.groupBy(0,1).reduce(new AdamGroup());
>
> //Graph is updated with the Adamic Adar Edges
> edg = edg.join(graph.getEdges(),
> JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new
> JoinEdge());
>
> Any idea how I could tackle this Exception?
>
