flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Krzysztof Pasierbinski <Krzysztof.Pasierbin...@dfki.de>
Subject AW: Cluster execution of an example program ("Word count") and a problem related to the modificated example
Date Sat, 05 Jul 2014 10:52:30 GMT
Hi Stephan,
thank you for your support.
I don't have a dedicated master node. My master node is worker at the same time. The second
node is a worker only. The small and big files have exactly the same structure (type, path,
structure, even the name - only the size and values change). At first, the input file was
only available at master node (and for small files it works!). After copying the input file
to the second worker node it works fine but I don't think that is the effective way to go.
So I am going to switch to HDFS.


-----Urspr√ľngliche Nachricht-----
Von: ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] Im Auftrag von Stephan Ewen
Gesendet: Dienstag, 1. Juli 2014 14:58
An: dev@flink.incubator.apache.org
Betreff: Re: Cluster execution of an example program ("Word count") and a problem related
to the modificated example

Hey Krzysztof!

Everything looks standard there.

Let me ask those questions, to make sure I get the discussion right:
 - You are running a two node setup. Has one node the master and a worker, the other one has
a worker only? Or do you have a dedicated master node?
 - Are the example on small data and on large data strictly the same, except for differently
sized input files?

Most importantly:
 - It the input file is available on the workers, is it available under the path "/home/krzysztof/stratosphere05/generatedFrequencies2.txt"
?

My guess right now is still that there the workers do not see the file properly.

Greetings,
Stephan




On Sun, Jun 29, 2014 at 4:50 PM, Krzysztof Pasierbinski < Krzysztof.Pasierbinski@dfki.de>
wrote:

> Hi Stephan,
> I have got 2 node configuration. The first node is a master and a 
> worker, the second node is a worker. File path, Flink (Stratosphere) 
> version and operation system is the same on both nodes.
> My test program is in the attachment (simple modification of "Word count"
> example).
> The execution plan looks like this:
> {
>     "nodes": [
>
>     {
>         "id": 4,
>         "type": "source",
>         "pact": "Data Source",
>         "contents": "TextInputFormat
> (file:/home/krzysztof/stratosphere05/generatedFrequencies2.txt) - UTF-8",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "3..6 GB" },
>             { "name": "Est. Cardinality", "value": "98.43 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "3..6 GB" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "3..6 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 3,
>         "type": "pact",
>         "pact": "FlatMap",
>         "contents":
> "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 4, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Map",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 2,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MAX(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 3, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Sorted Combine",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 1,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MAX(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 2, "ship_strategy": "Hash Partition on [0]",
> "local_strategy": "Sort (combining) on [0:ASC]"}
>         ],
>         "driver_strategy": "Sorted Group Reduce",
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "(unknown)" },
>             { "name": "Disk I/O", "value": "(unknown)" },
>             { "name": "CPU", "value": "(unknown)" },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 0,
>         "type": "sink",
>         "pact": "Data Sink",
>         "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@5bf734",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 1, "ship_strategy": "Forward"}
>         ],
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 8,
>         "type": "pact",
>         "pact": "FlatMap",
>         "contents":
> "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 4, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Map",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 7,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MIN(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 8, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Sorted Combine",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 6,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MIN(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 7, "ship_strategy": "Hash Partition on [0]",
> "local_strategy": "Sort (combining) on [0:ASC]"}
>         ],
>         "driver_strategy": "Sorted Group Reduce",
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "(unknown)" },
>             { "name": "Disk I/O", "value": "(unknown)" },
>             { "name": "CPU", "value": "(unknown)" },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 5,
>         "type": "sink",
>         "pact": "Data Sink",
>         "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@1ced484",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 6, "ship_strategy": "Forward"}
>         ],
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     }
>     ]
> }
>
> -----Urspr√ľngliche Nachricht-----
> Von: ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] Im Auftrag 
> von Stephan Ewen
> Gesendet: Sonntag, 29. Juni 2014 16:33
> An: dev@flink.incubator.apache.org
> Betreff: Re: Cluster execution of an example program ("Word count") 
> and a problem related to the modificated example
>
> Hi Krzysztof!
>
> Indeed, the input size should not matter. Can you tell us the details 
> of the setup that worked?
>
> The built-in examples work without distributed file system, because 
> they do not depend on files. The example programs set a Java 
> Collection as the input, which gets distributed as part of the program.
>
> Stephan
>
Mime
View raw message