flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Krzysztof Pasierbinski <Krzysztof.Pasierbin...@dfki.de>
Subject AW: Cluster execution of an example program ("Word count") and a problem related to the modificated example
Date Sun, 29 Jun 2014 14:50:51 GMT
Hi Stephan,
I have got 2 node configuration. The first node is a master and a worker, the second node
is a worker. File path, Flink (Stratosphere) version and operation system is the same on both
nodes.
My test program is in the attachment (simple modification of "Word count" example).  
The execution plan looks like this:
{
    "nodes": [

    {
        "id": 4,
        "type": "source",
        "pact": "Data Source",
        "contents": "TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies2.txt)
- UTF-8",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "3..6 GB" },
            { "name": "Est. Cardinality", "value": "98.43 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "3..6 GB" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "3..6 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 3,
        "type": "pact",
        "pact": "FlatMap",
        "contents": "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 4, "ship_strategy": "Forward"}
        ],
        "driver_strategy": "Map",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "492.19 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 2,
        "type": "pact",
        "pact": "GroupReduce",
        "contents": "SUM(2),MAX(1)",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 3, "ship_strategy": "Forward"}
        ],
        "driver_strategy": "Sorted Combine",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "492.19 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 1,
        "type": "pact",
        "pact": "GroupReduce",
        "contents": "SUM(2),MAX(1)",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 2, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort (combining)
on [0:ASC]"}
        ],
        "driver_strategy": "Sorted Group Reduce",
        "global_properties": [
            { "name": "Partitioning", "value": "HASH_PARTITIONED" },
            { "name": "Partitioned on", "value": "[0]" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "[0:ASC]" },
            { "name": "Grouped on", "value": "[0]" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "(unknown)" }        ],
        "costs": [
            { "name": "Network", "value": "(unknown)" },
            { "name": "Disk I/O", "value": "(unknown)" },
            { "name": "CPU", "value": "(unknown)" },
            { "name": "Cumulative Network", "value": "(unknown)" },
            { "name": "Cumulative Disk I/O", "value": "(unknown)" },
            { "name": "Cumulative CPU", "value": "(unknown)" }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 0,
        "type": "sink",
        "pact": "Data Sink",
        "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@5bf734",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 1, "ship_strategy": "Forward"}
        ],
        "global_properties": [
            { "name": "Partitioning", "value": "HASH_PARTITIONED" },
            { "name": "Partitioned on", "value": "[0]" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "[0:ASC]" },
            { "name": "Grouped on", "value": "[0]" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "(unknown)" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "(unknown)" },
            { "name": "Cumulative Disk I/O", "value": "(unknown)" },
            { "name": "Cumulative CPU", "value": "(unknown)" }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 8,
        "type": "pact",
        "pact": "FlatMap",
        "contents": "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 4, "ship_strategy": "Forward"}
        ],
        "driver_strategy": "Map",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "492.19 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 7,
        "type": "pact",
        "pact": "GroupReduce",
        "contents": "SUM(2),MIN(1)",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 8, "ship_strategy": "Forward"}
        ],
        "driver_strategy": "Sorted Combine",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "492.19 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 6,
        "type": "pact",
        "pact": "GroupReduce",
        "contents": "SUM(2),MIN(1)",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 7, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort (combining)
on [0:ASC]"}
        ],
        "driver_strategy": "Sorted Group Reduce",
        "global_properties": [
            { "name": "Partitioning", "value": "HASH_PARTITIONED" },
            { "name": "Partitioned on", "value": "[0]" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "[0:ASC]" },
            { "name": "Grouped on", "value": "[0]" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "(unknown)" }        ],
        "costs": [
            { "name": "Network", "value": "(unknown)" },
            { "name": "Disk I/O", "value": "(unknown)" },
            { "name": "CPU", "value": "(unknown)" },
            { "name": "Cumulative Network", "value": "(unknown)" },
            { "name": "Cumulative Disk I/O", "value": "(unknown)" },
            { "name": "Cumulative CPU", "value": "(unknown)" }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 5,
        "type": "sink",
        "pact": "Data Sink",
        "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@1ced484",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 6, "ship_strategy": "Forward"}
        ],
        "global_properties": [
            { "name": "Partitioning", "value": "HASH_PARTITIONED" },
            { "name": "Partitioned on", "value": "[0]" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "[0:ASC]" },
            { "name": "Grouped on", "value": "[0]" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "(unknown)" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "(unknown)" },
            { "name": "Cumulative Disk I/O", "value": "(unknown)" },
            { "name": "Cumulative CPU", "value": "(unknown)" }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    }
    ]
}

-----Urspr√ľngliche Nachricht-----
Von: ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] Im Auftrag von Stephan Ewen
Gesendet: Sonntag, 29. Juni 2014 16:33
An: dev@flink.incubator.apache.org
Betreff: Re: Cluster execution of an example program ("Word count") and a problem related
to the modificated example

Hi Krzysztof!

Indeed, the input size should not matter. Can you tell us the details of the setup that worked?

The built-in examples work without distributed file system, because they do not depend on
files. The example programs set a Java Collection as the input, which gets distributed as
part of the program.

Stephan
Mime
  • Unnamed multipart/mixed (inline, None, 0 bytes)
View raw message