flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Miguel Coimbra <miguel.e.coim...@gmail.com>
Subject Re: Too few memory segments provided. Hash Table needs at least 33 memory segments.
Date Mon, 14 Nov 2016 17:33:11 GMT
Hello,

I believe I have figured this out.

First, I tried Aandrey Melentyev's suggestion of executing with Apache
Flink 1.1.3, both with default conf/flink-conf.yaml parameters as well as
with some changes to provide additional memory. However, the same error
happened.

Note: I changed my project's pom.xml and generated the .jar again using
Maven.
I also copied the new .jar to both Docker instances.

The test machine has 256 GB RAM and it is a scenario of two Docker
containers.
I send attached the relevant parts of the logs of the JobManager and of the
TaskManager.
Regarding memory in the TaskManager log, I was looking at a couple of
executions and noticed something strange:

2016-11-14 15:48:45,256 INFO
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
64 MB for network buffer pool (number of memory segments: 2048, bytes per
segment: 32768).
2016-11-14 15:48:45,413 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Limiting
managed memory to 0.7 of the currently free heap space (310 MB), memory
will be allocated lazily.

After that, I looked at the start of the TaskManager log and found this:

2016-11-14 15:48:38,843 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -  Starting
TaskManager (Version: 1.1.3, Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC)
2016-11-14 15:48:38,843 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -  Current
user: flink
2016-11-14 15:48:38,844 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -  JVM:
OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14
2016-11-14 15:48:38,844 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -  Maximum
heap size: 512 MiBytes
2016-11-14 15:48:38,844 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -  JAVA_HOME:
/usr/lib/jvm/java-1.8-openjdk/jre
2016-11-14 15:48:38,850 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -  Hadoop
version: 2.7.2
2016-11-14 15:48:38,850 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -  JVM
Options:
2016-11-14 15:48:38,850 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
-XX:+UseG1GC


*2016-11-14 15:48:38,850 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
-Xms512M2016-11-14 15:48:38,850 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
-Xmx512M*2016-11-14 15:48:38,850 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
-XX:MaxDirectMemorySize=8388607T

It seems it is running with only 512 MB, which is the default.
This in spite of me having edited the flink-conf.yaml file before invoking
the program for the cluster.
I looked at the log of the JobManager and the same thing happened: it was
using the default 256 MB instead of my 1024MB.

- To recap, I built the Docker Flink image with (I send the Dockerfile
attached):

cd docker-flink-image-builder/
ls
Dockerfile  Dockerfile~  README.md  README.md~  bluemix-docker-compose.sh*
build.sh*  docker-compose-bluemix.yml
./build.sh

The only file I changed from those is the Dockerfile.
This set of files was obtained from the Flink repository.
I used docker-compose up to start the standalone cluster:

screen
cd docker-flink-image-builder/
ls
Dockerfile  Dockerfile~  README.md  README.md~  bluemix-docker-compose.sh*
build.sh*  docker-compose-bluemix.yml  docker-compose.yml
docker-entrypoint.sh*
docker-compose up

Then I accessed each Docker instance:

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
/bin/sh
docker exec -it $(docker ps --filter
name=dockerflinkimagebuilder_taskmanager_1 --format={{.ID}}) /bin/sh

While inside each of those, I started a bash shell and changed the config
file like so:

bash
cd /home/myuser/docker-image-build-context/flink-1.1.3/conf
vi flink-conf.yaml

I have edited (on both the JobManager and the TaskManager) the following
settings:

# The heap size for the JobManager JVM
jobmanager.heap.mb: 1024

# The heap size for the TaskManager JVM
taskmanager.heap.mb: 4096

# The number of buffers for the network stack.
taskmanager.network.numberOfBuffers: 4096

It seems that changes I make to the flink-config.yaml file *are only
reflected after I kill the cluster and call *docker-compose up again.

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
flink run -m 707a534982e6:6123 -c flink.graph.example.App
/home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar
/home/myuser/com-dblp.ungraph.txt

The cluster now started with the correct memory values, but the result was
the same (it is in the logs).
However, I then doubled the memory again, so that I had:

# The heap size for the JobManager JVM
jobmanager.heap.mb: 2048

# The heap size for the TaskManager JVM
taskmanager.heap.mb: 8192

​After this, I killed the cluster (CTRL+C) on the screen which had started
it (graceful exit).
This time, after starting again with docker-compose up, I launched the
program again and it worked!

However, there is something I don't understand, perhaps because I am new to
the Docker ecosystem.
When do the changes to the flink-conf.yaml file get activated?

>From my understanding, I have to do this:

1 - Launch cluster with docker-compose up
2 - exec -it into each of the Docker instances and manually edit the
configuration file
3 - CTRL+C to gracefully kill cluster
4 - Relaunch cluster - it will now display correct heap values for the
JobManager and TaskManager.

*This is cumbersome.*
I know I can make my own scripts to automate this, but is this really the
correct way to launch a Flink standalone cluster on Docker with *custom
memory options?*

Should I instead change the Dockerfile to include a custom flink-conf.yaml file
when building the image? (so this would be taken right from the start)
What is the correct way to tackle this?

Thank you very much!

Output is below in case you are curious:

myuser@myserver:~/docker-flink-image-builder$ docker exec -it $(docker ps
--filter name=jobmanager --format={{.ID}}) flink run -m 707a534982e6:6123
-c flink.graph.example.App
/home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar
/home/myuser/com-dblp.ungraph.txt
Cluster configuration: Standalone cluster with JobManager at /
172.19.0.2:6123
Using address 172.19.0.2:6123 to connect to JobManager.
JobManager web interface address http://172.19.0.2:8081
Starting execution of program
Submitting job with JobID: 55544e0ebc1f5014df53b200974afdbf. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://
flink@172.19.0.2:6123/user/jobmanager#-1305686264]
11/14/2016 17:13:33     Job execution switched to status RUNNING.
11/14/2016 17:13:33     DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
SCHEDULED
11/14/2016 17:13:33     DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
DEPLOYING
11/14/2016 17:13:33     DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
11/14/2016 17:13:34     DataSink (count())(1/1) switched to SCHEDULED
11/14/2016 17:13:34     DataSink (count())(1/1) switched to DEPLOYING
11/14/2016 17:13:34     DataSink (count())(1/1) switched to RUNNING
11/14/2016 17:13:36     DataSink (count())(1/1) switched to FINISHED
11/14/2016 17:13:36     DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
FINISHED
Tuple size: 1049866
Submitting job with JobID: ab0931dc89e4a86de17549eeb518fde6. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://
flink@172.19.0.2:6123/user/jobmanager#-1305686264]
11/14/2016 17:13:37     Job execution switched to status RUNNING.
11/14/2016 17:13:37     CHAIN DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED 11/14/2016
17:13:37     CHAIN DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING 11/14/2016
17:13:37     CHAIN DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
11/14/2016 17:13:39     CHAIN DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
11/14/2016 17:13:39     CHAIN FlatMap (FlatMap at
fromDataSet(Graph.java:216)) -> Combine(Distinct at
fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
11/14/2016 17:13:39     CHAIN FlatMap (FlatMap at
fromDataSet(Graph.java:216)) -> Combine(Distinct at
fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
11/14/2016 17:13:39     CHAIN Map (Map at mapEdges(Graph.java:596)) ->
FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
SCHEDULED
11/14/2016 17:13:39     CHAIN Map (Map at mapEdges(Graph.java:596)) ->
FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
DEPLOYING
11/14/2016 17:13:39     CHAIN FlatMap (FlatMap at
fromDataSet(Graph.java:216)) -> Combine(Distinct at
fromDataSet(Graph.java:216))(1/1) switched to RUNNING
11/14/2016 17:13:39     CHAIN Map (Map at mapEdges(Graph.java:596)) ->
FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING
11/14/2016 17:13:40     CoGroup (Messaging)(1/1) switched to SCHEDULED
11/14/2016 17:13:40     CoGroup (Messaging)(1/1) switched to DEPLOYING
11/14/2016 17:13:40     CoGroup (Messaging)(1/1) switched to RUNNING
11/14/2016 17:13:44     CHAIN Reduce (Distinct at
fromDataSet(Graph.java:216)) -> Map (Map at
fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED
11/14/2016 17:13:44     CHAIN Reduce (Distinct at
fromDataSet(Graph.java:216)) -> Map (Map at
fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING
11/14/2016 17:13:44     CHAIN Reduce (Distinct at
fromDataSet(Graph.java:216)) -> Map (Map at
fromDataSet(Graph.java:217))(1/1) switched to RUNNING
11/14/2016 17:13:49     CHAIN Map (Map at mapEdges(Graph.java:596)) ->
FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED
11/14/2016 17:13:50     CHAIN FlatMap (FlatMap at
fromDataSet(Graph.java:216)) -> Combine(Distinct at
fromDataSet(Graph.java:216))(1/1) switched to FINISHED
11/14/2016 17:13:54     CHAIN Reduce (Distinct at
fromDataSet(Graph.java:216)) -> Map (Map at
fromDataSet(Graph.java:217))(1/1) switched to FINISHED
11/14/2016 17:13:54     IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
switched to SCHEDULED
11/14/2016 17:13:54     IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
switched to DEPLOYING
11/14/2016 17:13:54     IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
switched to RUNNING
11/14/2016 17:13:55     CoGroup (Vertex State Updates)(1/1) switched to
SCHEDULED
11/14/2016 17:13:55     CoGroup (Vertex State Updates)(1/1) switched to
DEPLOYING
11/14/2016 17:13:55     CoGroup (Vertex State Updates)(1/1) switched to
RUNNING
11/14/2016 17:14:06     Sync (Scatter-gather iteration
(org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
switched to SCHEDULED
11/14/2016 17:14:06     Sync (Scatter-gather iteration
(org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
switched to DEPLOYING
11/14/2016 17:14:06     Sync (Scatter-gather iteration
(org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
switched to RUNNING
11/14/2016 17:15:00     Sync (Scatter-gather iteration
(org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
switched to FINISHED
11/14/2016 17:15:00     DataSink (count())(1/1) switched to SCHEDULED
11/14/2016 17:15:00     DataSink (count())(1/1) switched to DEPLOYING
11/14/2016 17:15:00     CoGroup (Vertex State Updates)(1/1) switched to
FINISHED
11/14/2016 17:15:00     DataSink (count())(1/1) switched to RUNNING
11/14/2016 17:15:00     CoGroup (Messaging)(1/1) switched to FINISHED
11/14/2016 17:15:00     DataSink (count())(1/1) switched to FINISHED
11/14/2016 17:15:00     IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
switched to FINISHED
11/14/2016 17:15:00     Job execution switched to status FINISHED.
Component count: 317080
Program execution finished
Job with JobID ab0931dc89e4a86de17549eeb518fde6 has finished.
Job Runtime: 83229 ms
Accumulator Results:
- e6c358969906b4ce1d682d6840281848 (java.lang.Long): 317080​

Thanks you for the attention. It seems solved.​


Kind regards,

Miguel E. Coimbra
Email: miguel.e.coimbra@gmail.com <miguel.e.coimbra@ist.utl.pt>
Skype: miguel.e.coimbra

On 14 November 2016 at 09:26, Ufuk Celebi <uce@apache.org> wrote:

> What do the TaskManager logs say wrt to allocation of managed memory?
>
> Something like:
>
> Limiting managed memory to ... of the currently free heap space ...,
> memory will be allocated lazily.
>
> What else did you configure in flink-conf?
>
> Looping in Greg and Vasia who maintain Gelly and are most-familiar with
> the internals.
>
> – Ufuk
>
>
> On 8 November 2016 at 22:35:22, Miguel Coimbra (miguel.e.coimbra@gmail.com)
> wrote:
> > Dear community,
> >
> > I have a problem which I hope you'll be able to help with.
> > I apologize in advance for the verbosity of the post.
> > I am running the Flink standalone cluster (not even storing to the
> > filesystem) with 2 Docker containers.
> >
> > I set the image of the Dockerfile for Flink 1.1.2, which was the same
> > version of the main class in the .jar
> > The Docker image was configured to use Java 8, which is what the
> project's
> > pom.xml requires as well.
> > I have also edited the TaskManager conf/flink-con.yaml to have the
> > following values:
> >
> > ....
> > taskmanager.heap.mb: 7512
> > ....
> > taskmanager.network.numberOfBuffers: 16048
> > ....
> >
> >
> > Properties of this host/docker setup:
> > - host machine has *256 GB *of RAM
> > - job manager container is running with default flink config
> > - task manager has *7.5 GB *of memory available
> > - task manager number of buffers is *16048 *which is very generous
> compared
> > to the default value
> >
> > I am testing on the SNAP DBLP dataset:
> > https://snap.stanford.edu/data/com-DBLP.html
> > It has:
> >
> > 317080 nodes
> > 1049866 edges
> >
> > These are the relevant parts of the pom.xml of the project:
> > *(note: the project executes without error for local executions without
> the
> > cluster)*
> >
> > ....
> >
> > UTF-8
> >
> > UTF-8
> > 1.8
> > 1.8
> > 1.1.2
> >
> > .....
> >
> >
> > org.apache.flink
> > flink-java
> > ${flink.version}
> >
> >
> > org.apache.flink
> > flink-core
> > ${flink.version}
> >
> >
> > org.apache.flink
> > flink-streaming-java_2.10
> > ${flink.version}
> >
> >
> > org.apache.flink
> > flink-clients_2.10
> > ${flink.version}
> >
> >
> > org.apache.flink
> > flink-gelly_2.10
> > ${flink.version}
> >
> >
> > junit
> > junit
> > 3.8.1
> > test
> >
> >
> >
> > I am running (what I believe to be) a simple Gelly application,
> performing
> > the ConnectedComponents algorithm with 30 iterations:
> >
> > public static void main(String[] args) {
> > final ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> >
> >
> > final String dataPath = args[0];
> >
> > final DataSet> edgeTuples =
> > env.readCsvFile(dataPath)
> > .fieldDelimiter("\t") // node IDs are separated by spaces
> > .ignoreComments("#") // comments start with "%"
> > .types(Long.class, Long.class);
> >
> > try {
> > System.out.println("Tuple size: " + edgeTuples.count());
> > } catch (Exception e1) {
> > e1.printStackTrace();
> > }
> >
> > /*
> > * @param the key type for edge and vertex identifiers
> > * @param the value type for vertices
> > * @param the value type for edges
> > * public class Graph
> > */
> >
> >
> > final Graph graph = Graph.fromTuple2DataSet(
> > edgeTuples,
> > new MapFunction() {
> > private static final long serialVersionUID =
> > 8713516577419451509L;
> > public Long map(Long value) {
> > return value;
> > }
> > },
> > env
> > );
> >
> >
> > try {
> > /**
> > * @param key type
> > * @param vertex value type
> > * @param edge value type
> > * @param the return type
> >
> > class ConnectedComponents, EV>
> > implements GraphAlgorithm>>
> > */
> >
> > DataSet> verticesWithComponents =
> > graph.run(new ConnectedComponents(30));
> > System.out.println("Component count: " +
> > verticesWithComponents.count());
> > } catch (Exception e) {
> > e.printStackTrace();
> > }
> > }
> >
> >
> > However, the following is output on the host machine on execution:
> >
> > docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
> > flink run -m 3de7625b8e28:6123 -c flink.graph.example.App
> > /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar
> > /home/myuser/com-dblp.ungraph.txt
> >
> > Cluster configuration: Standalone cluster with JobManager at /
> > 172.19.0.2:6123
> > Using address 172.19.0.2:6123 to connect to JobManager.
> > JobManager web interface address http://172.19.0.2:8081
> > Starting execution of program
> > Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for
> > job completion.
> > Connected to JobManager at Actor[akka.tcp://
> > flink@172.19.0.2:6123/user/jobmanager#-658812967]
> >
> > 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> > SCHEDULED
> > 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> > DEPLOYING
> > 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> RUNNING
> > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to SCHEDULED
> > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to DEPLOYING
> > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to RUNNING
> > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to FINISHED
> > 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> > FINISHED
> > 11/08/2016 21:22:44 Job execution switched to status FINISHED.
> > Tuple size: 1049866
> > Submitting job with JobID: d68d6d775cc222d9fd0728d9666e83de. Waiting for
> > job completion.
> > Connected to JobManager at Actor[akka.tcp://
> > flink@172.19.0.2:6123/user/jobmanager#-658812967]
> > 11/08/2016 21:22:45 Job execution switched to status RUNNING.
> > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> > fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
> >
> > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> > fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
> >
> > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> > fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
> > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> > fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
> > 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> > fromDataSet(Graph.java:216)) -> Combine(Distinct at
> > fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
> > 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> > SCHEDULED
> > 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> > fromDataSet(Graph.java:216)) -> Combine(Distinct at
> > fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
> > 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> > DEPLOYING
> > 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> RUNNING
> > 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> > fromDataSet(Graph.java:216)) -> Combine(Distinct at
> > fromDataSet(Graph.java:216))(1/1) switched to RUNNING
> > 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to SCHEDULED
> > 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to DEPLOYING
> > 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to RUNNING
> > 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> > fromDataSet(Graph.java:216)) -> Map (Map at
> > fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED
> > 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> > fromDataSet(Graph.java:216)) -> Map (Map at
> > fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING
> > 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> > fromDataSet(Graph.java:216)) -> Map (Map at
> > fromDataSet(Graph.java:217))(1/1) switched to RUNNING
> > 11/08/2016 21:22:47 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> FINISHED
> > 11/08/2016 21:22:47 CHAIN FlatMap (FlatMap at
> > fromDataSet(Graph.java:216)) -> Combine(Distinct at
> > fromDataSet(Graph.java:216))(1/1) switched to FINISHED
> > 11/08/2016 21:22:48 CHAIN Reduce (Distinct at
> > fromDataSet(Graph.java:216)) -> Map (Map at
> > fromDataSet(Graph.java:217))(1/1) switched to FINISHED
> > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a
> ))(1/1)
> > switched to SCHEDULED
> > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a
> ))(1/1)
> > switched to DEPLOYING
> > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a
> ))(1/1)
> > switched to RUNNING
> > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a
> ))(1/1)
> > switched to FAILED
> > java.lang.IllegalArgumentException: Too few memory segments provided.
> Hash
> > Table needs at least 33 memory segments.
> > at
> > org.apache.flink.runtime.operators.hash.CompactingHashTable.(
> CompactingHashTable.java:206)
> > at
> > org.apache.flink.runtime.operators.hash.CompactingHashTable.(
> CompactingHashTable.java:191)
> > at
> > org.apache.flink.runtime.iterative.task.IterationHeadTask.
> initCompactingHashTable(IterationHeadTask.java:175)
> > at
> > org.apache.flink.runtime.iterative.task.IterationHeadTask.run(
> IterationHeadTask.java:272)
> > at
> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > 11/08/2016 21:22:48 Job execution switched to status FAILING.
> > java.lang.IllegalArgumentException: Too few memory segments provided.
> Hash
> > Table needs at least 33 memory segments.
> > at
> > org.apache.flink.runtime.operators.hash.CompactingHashTable.(
> CompactingHashTable.java:206)
> > at
> > org.apache.flink.runtime.operators.hash.CompactingHashTable.(
> CompactingHashTable.java:191)
> > at
> > org.apache.flink.runtime.iterative.task.IterationHeadTask.
> initCompactingHashTable(IterationHeadTask.java:175)
> > at
> > org.apache.flink.runtime.iterative.task.IterationHeadTask.run(
> IterationHeadTask.java:272)
> > at
> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > The results I found online so far were not enough, and I am not sure as
> to
> > the best way to solve this.
> >
> > If anyone can help diagnose and correct this issue, I would be very
> > thankful.
> >
> > Best regards,
> >
> > Miguel E. Coimbra
> > Email: miguel.e.coimbra@gmail.com
> > Skype: miguel.e.coimbra
> >
>
>

Mime
View raw message