flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Krzysztof Pasierbinski <Krzysztof.Pasierbin...@dfki.de>
Subject Cluster execution of an example program ("Word count") and a problem related to the modificated example
Date Sat, 28 Jun 2014 16:59:02 GMT
Hi all,
I am new to Flink/Stratosphere so I would like to welcome everyone. I was trying to configure
a simple 2-node cluster (Ubuntu 12.04) and I came across following problems (using Stratosphere
0.5):

1)      I execute a "Word count" example and after the execution I get the result split into
2 files (each one on the different node). How can I modify this program to get the whole result
as a one merged file on the master node?

2)      I have wrote my own program basing on a "Word count" program's structure. There aren't
any problems with the execution in a local mode for any size of the file. For small files
it works fine on the cluster too. Unfortunately I get following error for bigger files (2000
lines):

06/28/2014 18:15:20:    Job execution switched to status SCHEDULED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt)
- UTF-8) (1/2) switched to SCHEDULED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer)
-> Combine (SUM(2),MAX(1)) (1/2) switched to SCHEDULED
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (1/2) switched to SCHEDULED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>)
(1/2) switched to SCHEDULED
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (2/2) switched to SCHEDULED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>)
(2/2) switched to SCHEDULED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2)
-> Combine (SUM(2),MIN(1)) (1/2) switched to SCHEDULED
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (1/2) switched to SCHEDULED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>)
(1/2) switched to SCHEDULED
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (2/2) switched to SCHEDULED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>)
(2/2) switched to SCHEDULED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt)
- UTF-8) (2/2) switched to SCHEDULED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer)
-> Combine (SUM(2),MAX(1)) (2/2) switched to SCHEDULED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2)
-> Combine (SUM(2),MIN(1)) (2/2) switched to SCHEDULED
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (1/2) switched to ASSIGNED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2)
-> Combine (SUM(2),MIN(1)) (1/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>)
(1/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt)
- UTF-8) (1/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>)
(1/2) switched to ASSIGNED
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (1/2) switched to ASSIGNED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer)
-> Combine (SUM(2),MAX(1)) (1/2) switched to ASSIGNED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer)
-> Combine (SUM(2),MAX(1)) (2/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>)
(2/2) switched to ASSIGNED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2)
-> Combine (SUM(2),MIN(1)) (2/2) switched to ASSIGNED
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (2/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt)
- UTF-8) (2/2) switched to ASSIGNED
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (2/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>)
(2/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt)
- UTF-8) (1/2) switched to READY
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt)
- UTF-8) (2/2) switched to READY
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt)
- UTF-8) (1/2) switched to STARTING
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt)
- UTF-8) (2/2) switched to STARTING
06/28/2014 18:15:20:    Job execution switched to status RUNNING
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt)
- UTF-8) (2/2) switched to RUNNING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer)
-> Combine (SUM(2),MAX(1)) (2/2) switched to READY
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer)
-> Combine (SUM(2),MAX(1)) (2/2) switched to STARTING
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt)
- UTF-8) (1/2) switched to RUNNING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer)
-> Combine (SUM(2),MAX(1)) (2/2) switched to RUNNING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer)
-> Combine (SUM(2),MAX(1)) (1/2) switched to CANCELING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer)
-> Combine (SUM(2),MAX(1)) (1/2) switched to CANCELED
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (1/2) switched to CANCELING
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (1/2) switched to CANCELED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>)
(1/2) switched to CANCELING
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>)
(1/2) switched to CANCELED
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (2/2) switched to CANCELING
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (2/2) switched to CANCELED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>)
(2/2) switched to CANCELING
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>)
(2/2) switched to CANCELED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2)
-> Combine (SUM(2),MIN(1)) (1/2) switched to CANCELING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2)
-> Combine (SUM(2),MIN(1)) (1/2) switched to CANCELED
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (1/2) switched to CANCELING
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (1/2) switched to CANCELED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>)
(1/2) switched to CANCELING
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>)
(1/2) switched to CANCELED
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (2/2) switched to CANCELING
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (2/2) switched to CANCELED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>)
(2/2) switched to CANCELING
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>)
(2/2) switched to CANCELED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt)
- UTF-8) (2/2) switched to CANCELING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer)
-> Combine (SUM(2),MAX(1)) (2/2) switched to CANCELING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2)
-> Combine (SUM(2),MIN(1)) (2/2) switched to CANCELING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2)
-> Combine (SUM(2),MIN(1)) (2/2) switched to CANCELED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt)
- UTF-8) (1/2) switched to FAILED
java.io.IOException: Error opening the Input Split file:/home/krzysztof/stratosphere05/generatedFrequencies.txt
[41847,41847]: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory)
    at eu.stratosphere.api.common.io.FileInputFormat.open(FileInputFormat.java:616)
    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:441)
    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:44)
    at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:140)
    at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:351)
    at java.lang.Thread.run(Thread.java:701)
Caused by: java.io.FileNotFoundException: /home/krzysztof/stratosphere05/generatedFrequencies.txt
(No such file or directory)
    at java.io.FileInputStream.open(Native Method)
    at java.io.FileInputStream.<init>(FileInputStream.java:140)
    at eu.stratosphere.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:44)
    at eu.stratosphere.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:135)
    at eu.stratosphere.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:756)

06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt)
- UTF-8) (2/2) switched to CANCELED
06/28/2014 18:15:20:    Job execution switched to status FAILED
Error: The program execution failed: java.io.IOException: Error opening the Input Split file:/home/krzysztof/stratosphere05/generatedFrequencies.txt
[41847,41847]: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory)
    at eu.stratosphere.api.common.io.FileInputFormat.open(FileInputFormat.java:616)
    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:441)
    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:44)
    at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:140)
    at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:351)
    at java.lang.Thread.run(Thread.java:701)
Caused by: java.io.FileNotFoundException: /home/krzysztof/stratosphere05/generatedFrequencies.txt
(No such file or directory)
    at java.io.FileInputStream.open(Native Method)
    at java.io.FileInputStream.<init>(FileInputStream.java:140)
    at eu.stratosphere.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:44)
    at eu.stratosphere.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:135)
    at eu.stratosphere.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:756)

eu.stratosphere.client.program.ProgramInvocationException: The program execution failed: java.io.IOException:
Error opening the Input Split file:/home/krzysztof/stratosphere05/generatedFrequencies.txt
[41847,41847]: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory)
    at eu.stratosphere.api.common.io.FileInputFormat.open(FileInputFormat.java:616)
    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:441)
    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:44)
    at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:140)
    at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:351)
    at java.lang.Thread.run(Thread.java:701)
Caused by: java.io.FileNotFoundException: /home/krzysztof/stratosphere05/generatedFrequencies.txt
(No such file or directory)
    at java.io.FileInputStream.open(Native Method)
    at java.io.FileInputStream.<init>(FileInputStream.java:140)
    at eu.stratosphere.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:44)
    at eu.stratosphere.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:135)
    at eu.stratosphere.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:756)

    at eu.stratosphere.client.program.Client.run(Client.java:297)
    at eu.stratosphere.client.program.Client.run(Client.java:263)
    at eu.stratosphere.client.program.Client.run(Client.java:257)
    at eu.stratosphere.client.program.ContextEnvironment.execute(ContextEnvironment.java:50)
    at eu.stratosphere.example.java.wordcount.WordCount.main(WordCount.java:109)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:383)
    at eu.stratosphere.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:301)
    at eu.stratosphere.client.program.Client.run(Client.java:215)
    at eu.stratosphere.client.CliFrontend.executeProgram(CliFrontend.java:327)
    at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314)
    at eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:927)
    at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951)

Cluster configuration:
Slaves:
192.168.11.216
192.168.11.202

Job Manager:
jobmanager.rpc.address: 192.168.11.202


I would be very grateful for any help.

Best regards,
Krzysztof PasierbiƄski


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message