flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vasiliki Kalavri <vasilikikala...@gmail.com>
Subject Re: Too few memory segments provided. Hash Table needs at least 33 memory segments.
Date Tue, 15 Nov 2016 14:38:24 GMT
Hi Miguel,

I'm sorry for the late reply; this e-mail got stuck in my spam folder. I'm
glad that you've found a solution :)

I've never used flink with docker, so I'm probably not the best person to
advise you on this. However, if I understand correctly, you're changing the
configuration before submitting the job but while the flink cluster is
already running. I don't know if docker is supposed to do something
differently, but after a flink cluster has been started, nodes won't reload
any changes you make to the flink-conf.yaml. You'll either have to make
your changes before starting the cluster or re-start.

Cheers,
-Vasia.

On 14 November 2016 at 18:33, Miguel Coimbra <miguel.e.coimbra@gmail.com>
wrote:

> Hello,
>
> I believe I have figured this out.
>
> First, I tried Aandrey Melentyev's suggestion of executing with Apache
> Flink 1.1.3, both with default conf/flink-conf.yaml parameters as well as
> with some changes to provide additional memory. However, the same error
> happened.
>
> Note: I changed my project's pom.xml and generated the .jar again using
> Maven.
> I also copied the new .jar to both Docker instances.
>
> The test machine has 256 GB RAM and it is a scenario of two Docker
> containers.
> I send attached the relevant parts of the logs of the JobManager and of
> the TaskManager.
> Regarding memory in the TaskManager log, I was looking at a couple of
> executions and noticed something strange:
>
> 2016-11-14 15:48:45,256 INFO  org.apache.flink.runtime.io.ne
> twork.buffer.NetworkBufferPool  - Allocated 64 MB for network buffer pool
> (number of memory segments: 2048, bytes per segment: 32768).
> 2016-11-14 15:48:45,413 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager              - Limiting managed memory to 0.7 of the
> currently free heap space (310 MB), memory will be allocated lazily.
>
> After that, I looked at the start of the TaskManager log and found this:
>
> 2016-11-14 15:48:38,843 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager              -  Starting TaskManager (Version: 1.1.3,
> Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC)
> 2016-11-14 15:48:38,843 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager              -  Current user: flink
> 2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager              -  JVM: OpenJDK 64-Bit Server VM - Oracle
> Corporation - 1.8/25.92-b14
> 2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager              -  Maximum heap size: 512 MiBytes
> 2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager              -  JAVA_HOME:
> /usr/lib/jvm/java-1.8-openjdk/jre
> 2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager              -  Hadoop version: 2.7.2
> 2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager              -  JVM Options:
> 2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager              -     -XX:+UseG1GC
>
>
> *2016-11-14 15:48:38,850 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              -
> -Xms512M2016-11-14 15:48:38,850 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              -
> -Xmx512M*2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager              -     -XX:MaxDirectMemorySize=8388607T
>
> It seems it is running with only 512 MB, which is the default.
> This in spite of me having edited the flink-conf.yaml file before
> invoking the program for the cluster.
> I looked at the log of the JobManager and the same thing happened: it was
> using the default 256 MB instead of my 1024MB.
>
> - To recap, I built the Docker Flink image with (I send the Dockerfile
> attached):
>
> cd docker-flink-image-builder/
> ls
> Dockerfile  Dockerfile~  README.md  README.md~
> bluemix-docker-compose.sh*  build.sh*  docker-compose-bluemix.yml
> ./build.sh
>
> The only file I changed from those is the Dockerfile.
> This set of files was obtained from the Flink repository.
> I used docker-compose up to start the standalone cluster:
>
> screen
> cd docker-flink-image-builder/
> ls
> Dockerfile  Dockerfile~  README.md  README.md~
> bluemix-docker-compose.sh*  build.sh*  docker-compose-bluemix.yml
> docker-compose.yml  docker-entrypoint.sh*
> docker-compose up
>
> Then I accessed each Docker instance:
>
> docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
> /bin/sh
> docker exec -it $(docker ps --filter name=dockerflinkimagebuilder_taskmanager_1
> --format={{.ID}}) /bin/sh
>
> While inside each of those, I started a bash shell and changed the config
> file like so:
>
> bash
> cd /home/myuser/docker-image-build-context/flink-1.1.3/conf
> vi flink-conf.yaml
>
> I have edited (on both the JobManager and the TaskManager) the following
> settings:
>
> # The heap size for the JobManager JVM
> jobmanager.heap.mb: 1024
>
> # The heap size for the TaskManager JVM
> taskmanager.heap.mb: 4096
>
> # The number of buffers for the network stack.
> taskmanager.network.numberOfBuffers: 4096
>
> It seems that changes I make to the flink-config.yaml file *are only
> reflected after I kill the cluster and call *docker-compose up again.
>
> docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
> flink run -m 707a534982e6:6123 -c flink.graph.example.App
> /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar
> /home/myuser/com-dblp.ungraph.txt
>
> The cluster now started with the correct memory values, but the result was
> the same (it is in the logs).
> However, I then doubled the memory again, so that I had:
>
> # The heap size for the JobManager JVM
> jobmanager.heap.mb: 2048
>
> # The heap size for the TaskManager JVM
> taskmanager.heap.mb: 8192
>
> ​After this, I killed the cluster (CTRL+C) on the screen which had started
> it (graceful exit).
> This time, after starting again with docker-compose up, I launched the
> program again and it worked!
>
> However, there is something I don't understand, perhaps because I am new
> to the Docker ecosystem.
> When do the changes to the flink-conf.yaml file get activated?
>
> From my understanding, I have to do this:
>
> 1 - Launch cluster with docker-compose up
> 2 - exec -it into each of the Docker instances and manually edit the
> configuration file
> 3 - CTRL+C to gracefully kill cluster
> 4 - Relaunch cluster - it will now display correct heap values for the
> JobManager and TaskManager.
>
> *This is cumbersome.*
> I know I can make my own scripts to automate this, but is this really the
> correct way to launch a Flink standalone cluster on Docker with *custom
> memory options?*
>
> Should I instead change the Dockerfile to include a custom flink-conf.yaml
> file when building the image? (so this would be taken right from the start)
> What is the correct way to tackle this?
>
> Thank you very much!
>
> Output is below in case you are curious:
>
> myuser@myserver:~/docker-flink-image-builder$ docker exec -it $(docker ps
> --filter name=jobmanager --format={{.ID}}) flink run -m 707a534982e6:6123
> -c flink.graph.example.App /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar
> /home/myuser/com-dblp.ungraph.txt
> Cluster configuration: Standalone cluster with JobManager at /
> 172.19.0.2:6123
> Using address 172.19.0.2:6123 to connect to JobManager.
> JobManager web interface address http://172.19.0.2:8081
> Starting execution of program
> Submitting job with JobID: 55544e0ebc1f5014df53b200974afdbf. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://flink@172.19.
> 0.2:6123/user/jobmanager#-1305686264]
> 11/14/2016 17:13:33     Job execution switched to status RUNNING.
> 11/14/2016 17:13:33     DataSource (at main(App.java:25) (
> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> SCHEDULED
> 11/14/2016 17:13:33     DataSource (at main(App.java:25) (
> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> DEPLOYING
> 11/14/2016 17:13:33     DataSource (at main(App.java:25) (
> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> RUNNING
> 11/14/2016 17:13:34     DataSink (count())(1/1) switched to SCHEDULED
> 11/14/2016 17:13:34     DataSink (count())(1/1) switched to DEPLOYING
> 11/14/2016 17:13:34     DataSink (count())(1/1) switched to RUNNING
> 11/14/2016 17:13:36     DataSink (count())(1/1) switched to FINISHED
> 11/14/2016 17:13:36     DataSource (at main(App.java:25) (
> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> FINISHED
> Tuple size: 1049866
> Submitting job with JobID: ab0931dc89e4a86de17549eeb518fde6. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://flink@172.19.
> 0.2:6123/user/jobmanager#-1305686264]
> 11/14/2016 17:13:37     Job execution switched to status RUNNING.
> 11/14/2016 17:13:37     CHAIN DataSource (at main(App.java:25) (
> org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED 11/14/2016
> 17:13:37     CHAIN DataSource (at main(App.java:25) (
> org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING 11/14/2016
> 17:13:37     CHAIN DataSource (at main(App.java:25) (
> org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
> 11/14/2016 17:13:39     CHAIN DataSource (at main(App.java:25) (
> org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
> 11/14/2016 17:13:39     CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
> 11/14/2016 17:13:39     CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
> 11/14/2016 17:13:39     CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> SCHEDULED
> 11/14/2016 17:13:39     CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> DEPLOYING
> 11/14/2016 17:13:39     CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to RUNNING
> 11/14/2016 17:13:39     CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> RUNNING
> 11/14/2016 17:13:40     CoGroup (Messaging)(1/1) switched to SCHEDULED
> 11/14/2016 17:13:40     CoGroup (Messaging)(1/1) switched to DEPLOYING
> 11/14/2016 17:13:40     CoGroup (Messaging)(1/1) switched to RUNNING
> 11/14/2016 17:13:44     CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1)
> switched to SCHEDULED
> 11/14/2016 17:13:44     CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1)
> switched to DEPLOYING
> 11/14/2016 17:13:44     CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1)
> switched to RUNNING
> 11/14/2016 17:13:49     CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> FINISHED
> 11/14/2016 17:13:50     CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to FINISHED
> 11/14/2016 17:13:54     CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1)
> switched to FINISHED
> 11/14/2016 17:13:54     IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
> switched to SCHEDULED
> 11/14/2016 17:13:54     IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
> switched to DEPLOYING
> 11/14/2016 17:13:54     IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
> switched to RUNNING
> 11/14/2016 17:13:55     CoGroup (Vertex State Updates)(1/1) switched to
> SCHEDULED
> 11/14/2016 17:13:55     CoGroup (Vertex State Updates)(1/1) switched to
> DEPLOYING
> 11/14/2016 17:13:55     CoGroup (Vertex State Updates)(1/1) switched to
> RUNNING
> 11/14/2016 17:14:06     Sync (Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
> switched to SCHEDULED
> 11/14/2016 17:14:06     Sync (Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
> switched to DEPLOYING
> 11/14/2016 17:14:06     Sync (Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
> switched to RUNNING
> 11/14/2016 17:15:00     Sync (Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
> switched to FINISHED
> 11/14/2016 17:15:00     DataSink (count())(1/1) switched to SCHEDULED
> 11/14/2016 17:15:00     DataSink (count())(1/1) switched to DEPLOYING
> 11/14/2016 17:15:00     CoGroup (Vertex State Updates)(1/1) switched to
> FINISHED
> 11/14/2016 17:15:00     DataSink (count())(1/1) switched to RUNNING
> 11/14/2016 17:15:00     CoGroup (Messaging)(1/1) switched to FINISHED
> 11/14/2016 17:15:00     DataSink (count())(1/1) switched to FINISHED
> 11/14/2016 17:15:00     IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1)
> switched to FINISHED
> 11/14/2016 17:15:00     Job execution switched to status FINISHED.
> Component count: 317080
> Program execution finished
> Job with JobID ab0931dc89e4a86de17549eeb518fde6 has finished.
> Job Runtime: 83229 ms
> Accumulator Results:
> - e6c358969906b4ce1d682d6840281848 (java.lang.Long): 317080​
>
> Thanks you for the attention. It seems solved.​
>
>
> Kind regards,
>
> Miguel E. Coimbra
> Email: miguel.e.coimbra@gmail.com <miguel.e.coimbra@ist.utl.pt>
> Skype: miguel.e.coimbra
>
> On 14 November 2016 at 09:26, Ufuk Celebi <uce@apache.org> wrote:
>
>> What do the TaskManager logs say wrt to allocation of managed memory?
>>
>> Something like:
>>
>> Limiting managed memory to ... of the currently free heap space ...,
>> memory will be allocated lazily.
>>
>> What else did you configure in flink-conf?
>>
>> Looping in Greg and Vasia who maintain Gelly and are most-familiar with
>> the internals.
>>
>> – Ufuk
>>
>>
>> On 8 November 2016 at 22:35:22, Miguel Coimbra (
>> miguel.e.coimbra@gmail.com) wrote:
>> > Dear community,
>> >
>> > I have a problem which I hope you'll be able to help with.
>> > I apologize in advance for the verbosity of the post.
>> > I am running the Flink standalone cluster (not even storing to the
>> > filesystem) with 2 Docker containers.
>> >
>> > I set the image of the Dockerfile for Flink 1.1.2, which was the same
>> > version of the main class in the .jar
>> > The Docker image was configured to use Java 8, which is what the
>> project's
>> > pom.xml requires as well.
>> > I have also edited the TaskManager conf/flink-con.yaml to have the
>> > following values:
>> >
>> > ....
>> > taskmanager.heap.mb: 7512
>> > ....
>> > taskmanager.network.numberOfBuffers: 16048
>> > ....
>> >
>> >
>> > Properties of this host/docker setup:
>> > - host machine has *256 GB *of RAM
>> > - job manager container is running with default flink config
>> > - task manager has *7.5 GB *of memory available
>> > - task manager number of buffers is *16048 *which is very generous
>> compared
>> > to the default value
>> >
>> > I am testing on the SNAP DBLP dataset:
>> > https://snap.stanford.edu/data/com-DBLP.html
>> > It has:
>> >
>> > 317080 nodes
>> > 1049866 edges
>> >
>> > These are the relevant parts of the pom.xml of the project:
>> > *(note: the project executes without error for local executions without
>> the
>> > cluster)*
>> >
>> > ....
>> >
>> > UTF-8
>> >
>> > UTF-8
>> > 1.8
>> > 1.8
>> > 1.1.2
>> >
>> > .....
>> >
>> >
>> > org.apache.flink
>> > flink-java
>> > ${flink.version}
>> >
>> >
>> > org.apache.flink
>> > flink-core
>> > ${flink.version}
>> >
>> >
>> > org.apache.flink
>> > flink-streaming-java_2.10
>> > ${flink.version}
>> >
>> >
>> > org.apache.flink
>> > flink-clients_2.10
>> > ${flink.version}
>> >
>> >
>> > org.apache.flink
>> > flink-gelly_2.10
>> > ${flink.version}
>> >
>> >
>> > junit
>> > junit
>> > 3.8.1
>> > test
>> >
>> >
>> >
>> > I am running (what I believe to be) a simple Gelly application,
>> performing
>> > the ConnectedComponents algorithm with 30 iterations:
>> >
>> > public static void main(String[] args) {
>> > final ExecutionEnvironment env =
>> > ExecutionEnvironment.getExecutionEnvironment();
>> >
>> >
>> > final String dataPath = args[0];
>> >
>> > final DataSet> edgeTuples =
>> > env.readCsvFile(dataPath)
>> > .fieldDelimiter("\t") // node IDs are separated by spaces
>> > .ignoreComments("#") // comments start with "%"
>> > .types(Long.class, Long.class);
>> >
>> > try {
>> > System.out.println("Tuple size: " + edgeTuples.count());
>> > } catch (Exception e1) {
>> > e1.printStackTrace();
>> > }
>> >
>> > /*
>> > * @param the key type for edge and vertex identifiers
>> > * @param the value type for vertices
>> > * @param the value type for edges
>> > * public class Graph
>> > */
>> >
>> >
>> > final Graph graph = Graph.fromTuple2DataSet(
>> > edgeTuples,
>> > new MapFunction() {
>> > private static final long serialVersionUID =
>> > 8713516577419451509L;
>> > public Long map(Long value) {
>> > return value;
>> > }
>> > },
>> > env
>> > );
>> >
>> >
>> > try {
>> > /**
>> > * @param key type
>> > * @param vertex value type
>> > * @param edge value type
>> > * @param the return type
>> >
>> > class ConnectedComponents, EV>
>> > implements GraphAlgorithm>>
>> > */
>> >
>> > DataSet> verticesWithComponents =
>> > graph.run(new ConnectedComponents(30));
>> > System.out.println("Component count: " +
>> > verticesWithComponents.count());
>> > } catch (Exception e) {
>> > e.printStackTrace();
>> > }
>> > }
>> >
>> >
>> > However, the following is output on the host machine on execution:
>> >
>> > docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
>> > flink run -m 3de7625b8e28:6123 -c flink.graph.example.App
>> > /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar
>> > /home/myuser/com-dblp.ungraph.txt
>> >
>> > Cluster configuration: Standalone cluster with JobManager at /
>> > 172.19.0.2:6123
>> > Using address 172.19.0.2:6123 to connect to JobManager.
>> > JobManager web interface address http://172.19.0.2:8081
>> > Starting execution of program
>> > Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting
>> for
>> > job completion.
>> > Connected to JobManager at Actor[akka.tcp://
>> > flink@172.19.0.2:6123/user/jobmanager#-658812967]
>> >
>> > 11/08/2016 21:22:44 DataSource (at main(App.java:25)
>> > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>> > SCHEDULED
>> > 11/08/2016 21:22:44 DataSource (at main(App.java:25)
>> > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>> > DEPLOYING
>> > 11/08/2016 21:22:44 DataSource (at main(App.java:25)
>> > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>> RUNNING
>> > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to SCHEDULED
>> > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to DEPLOYING
>> > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to RUNNING
>> > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to FINISHED
>> > 11/08/2016 21:22:44 DataSource (at main(App.java:25)
>> > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>> > FINISHED
>> > 11/08/2016 21:22:44 Job execution switched to status FINISHED.
>> > Tuple size: 1049866
>> > Submitting job with JobID: d68d6d775cc222d9fd0728d9666e83de. Waiting
>> for
>> > job completion.
>> > Connected to JobManager at Actor[akka.tcp://
>> > flink@172.19.0.2:6123/user/jobmanager#-658812967]
>> > 11/08/2016 21:22:45 Job execution switched to status RUNNING.
>> > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
>> > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
>> > fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
>> >
>> > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
>> > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
>> > fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
>> >
>> > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
>> > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
>> > fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
>> > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
>> > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
>> > fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
>> > 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
>> > fromDataSet(Graph.java:216)) -> Combine(Distinct at
>> > fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
>> > 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
>> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
>> > SCHEDULED
>> > 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
>> > fromDataSet(Graph.java:216)) -> Combine(Distinct at
>> > fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
>> > 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
>> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
>> > DEPLOYING
>> > 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
>> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
>> RUNNING
>> > 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
>> > fromDataSet(Graph.java:216)) -> Combine(Distinct at
>> > fromDataSet(Graph.java:216))(1/1) switched to RUNNING
>> > 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to SCHEDULED
>> > 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to DEPLOYING
>> > 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to RUNNING
>> > 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
>> > fromDataSet(Graph.java:216)) -> Map (Map at
>> > fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED
>> > 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
>> > fromDataSet(Graph.java:216)) -> Map (Map at
>> > fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING
>> > 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
>> > fromDataSet(Graph.java:216)) -> Map (Map at
>> > fromDataSet(Graph.java:217))(1/1) switched to RUNNING
>> > 11/08/2016 21:22:47 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
>> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
>> FINISHED
>> > 11/08/2016 21:22:47 CHAIN FlatMap (FlatMap at
>> > fromDataSet(Graph.java:216)) -> Combine(Distinct at
>> > fromDataSet(Graph.java:216))(1/1) switched to FINISHED
>> > 11/08/2016 21:22:48 CHAIN Reduce (Distinct at
>> > fromDataSet(Graph.java:216)) -> Map (Map at
>> > fromDataSet(Graph.java:217))(1/1) switched to FINISHED
>> > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
>> > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
>> > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a
>> ))(1/1)
>> > switched to SCHEDULED
>> > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
>> > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
>> > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a
>> ))(1/1)
>> > switched to DEPLOYING
>> > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
>> > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
>> > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a
>> ))(1/1)
>> > switched to RUNNING
>> > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
>> > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
>> > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a
>> ))(1/1)
>> > switched to FAILED
>> > java.lang.IllegalArgumentException: Too few memory segments provided.
>> Hash
>> > Table needs at least 33 memory segments.
>> > at
>> > org.apache.flink.runtime.operators.hash.CompactingHashTable.
>> (CompactingHashTable.java:206)
>> > at
>> > org.apache.flink.runtime.operators.hash.CompactingHashTable.
>> (CompactingHashTable.java:191)
>> > at
>> > org.apache.flink.runtime.iterative.task.IterationHeadTask.in
>> itCompactingHashTable(IterationHeadTask.java:175)
>> > at
>> > org.apache.flink.runtime.iterative.task.IterationHeadTask.ru
>> n(IterationHeadTask.java:272)
>> > at
>> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>> > 11/08/2016 21:22:48 Job execution switched to status FAILING.
>> > java.lang.IllegalArgumentException: Too few memory segments provided.
>> Hash
>> > Table needs at least 33 memory segments.
>> > at
>> > org.apache.flink.runtime.operators.hash.CompactingHashTable.
>> (CompactingHashTable.java:206)
>> > at
>> > org.apache.flink.runtime.operators.hash.CompactingHashTable.
>> (CompactingHashTable.java:191)
>> > at
>> > org.apache.flink.runtime.iterative.task.IterationHeadTask.in
>> itCompactingHashTable(IterationHeadTask.java:175)
>> > at
>> > org.apache.flink.runtime.iterative.task.IterationHeadTask.ru
>> n(IterationHeadTask.java:272)
>> > at
>> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>> > The results I found online so far were not enough, and I am not sure as
>> to
>> > the best way to solve this.
>> >
>> > If anyone can help diagnose and correct this issue, I would be very
>> > thankful.
>> >
>> > Best regards,
>> >
>> > Miguel E. Coimbra
>> > Email: miguel.e.coimbra@gmail.com
>> > Skype: miguel.e.coimbra
>> >
>>
>>
>

Mime
View raw message