flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stephan Ewen (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-935) Bug in Optimizer when reusing work across iterations
Date Fri, 13 Jun 2014 15:09:04 GMT

     [ https://issues.apache.org/jira/browse/FLINK-935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Stephan Ewen updated FLINK-935:
-------------------------------

    Attachment: screenshot.png

> Bug in Optimizer when reusing work across iterations
> ----------------------------------------------------
>
>                 Key: FLINK-935
>                 URL: https://issues.apache.org/jira/browse/FLINK-935
>             Project: Flink
>          Issue Type: Bug
>          Components: Compiler/Optimizer
>    Affects Versions: 0.6-incubating, pre-apache-0.5, pre-apache-0.5.1
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 0.6-incubating, pre-apache-0.5.2
>
>         Attachments: screenshot.png
>
>
> The following created plan is invalid
> {code}
> {
> 	"nodes": [
> 	{
> 		"id": 3,
> 		"type": "source",
> 		"pact": "Data Source",
> 		"contents": "CSV Input (,) /some/file/path",
> 		"parallelism": "4",
> 		"subtasks_per_instance": "1",
> 		"global_properties": [
> 			{ "name": "Partitioning", "value": "RANDOM" },
> 			{ "name": "Partitioning Order", "value": "(none)" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"local_properties": [
> 			{ "name": "Order", "value": "(none)" },
> 			{ "name": "Grouping", "value": "not grouped" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"estimates": [
> 			{ "name": "Est. Output Size", "value": "(unknown)" },
> 			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
> 		"costs": [
> 			{ "name": "Network", "value": "0.0" },
> 			{ "name": "Disk I/O", "value": "0.0" },
> 			{ "name": "CPU", "value": "0.0" },
> 			{ "name": "Cumulative Network", "value": "0.0" },
> 			{ "name": "Cumulative Disk I/O", "value": "0.0" },
> 			{ "name": "Cumulative CPU", "value": "0.0" }
> 		],
> 		"compiler_hints": [
> 			{ "name": "Output Size (bytes)", "value": "(none)" },
> 			{ "name": "Output Cardinality", "value": "(none)" },
> 			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> 			{ "name": "Filter Factor", "value": "(none)" }		]
> 	},
> 	{
> 		"id": 2,
> 		"type": "pact",
> 		"pact": "Map",
> 		"contents": "eu.stratosphere.pact.compiler.IterationsCompilerTest$DuplicateValue",
> 		"parallelism": "4",
> 		"subtasks_per_instance": "1",
> 		"predecessors": [
> 			{"id": 3, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort on [0:ASC]"}
> 		],
> 		"driver_strategy": "Map",
> 		"global_properties": [
> 			{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
> 			{ "name": "Partitioned on", "value": "[0]" },
> 			{ "name": "Partitioning Order", "value": "(none)" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"local_properties": [
> 			{ "name": "Order", "value": "[0:ASC]" },
> 			{ "name": "Grouped on", "value": "[0]" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"estimates": [
> 			{ "name": "Est. Output Size", "value": "(unknown)" },
> 			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
> 		"costs": [
> 			{ "name": "Network", "value": "(unknown)" },
> 			{ "name": "Disk I/O", "value": "(unknown)" },
> 			{ "name": "CPU", "value": "(unknown)" },
> 			{ "name": "Cumulative Network", "value": "(unknown)" },
> 			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
> 			{ "name": "Cumulative CPU", "value": "(unknown)" }
> 		],
> 		"compiler_hints": [
> 			{ "name": "Output Size (bytes)", "value": "(none)" },
> 			{ "name": "Output Cardinality", "value": "(none)" },
> 			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> 			{ "name": "Filter Factor", "value": "(none)" }		]
> 	},
> 	{
> 		"step_function": [
> 	{
> 		"id": 9,
> 		"type": "pact",
> 		"pact": "Bulk Partial Solution",
> 		"contents": "Partial Solution",
> 		"parallelism": "4",
> 		"subtasks_per_instance": "1",
> 		"global_properties": [
> 			{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
> 			{ "name": "Partitioned on", "value": "[0]" },
> 			{ "name": "Partitioning Order", "value": "(none)" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"local_properties": [
> 			{ "name": "Order", "value": "[0:ASC]" },
> 			{ "name": "Grouped on", "value": "[0]" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"estimates": [
> 			{ "name": "Est. Output Size", "value": "(unknown)" },
> 			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
> 		"costs": [
> 			{ "name": "Network", "value": "0.0" },
> 			{ "name": "Disk I/O", "value": "0.0" },
> 			{ "name": "CPU", "value": "0.0" },
> 			{ "name": "Cumulative Network", "value": "0.0" },
> 			{ "name": "Cumulative Disk I/O", "value": "0.0" },
> 			{ "name": "Cumulative CPU", "value": "0.0" }
> 		],
> 		"compiler_hints": [
> 			{ "name": "Output Size (bytes)", "value": "(none)" },
> 			{ "name": "Output Cardinality", "value": "(none)" },
> 			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> 			{ "name": "Filter Factor", "value": "(none)" }		]
> 	},
> 	{
> 		"id": 10,
> 		"type": "source",
> 		"pact": "Data Source",
> 		"contents": "CSV Input (,) /some/file/path",
> 		"parallelism": "4",
> 		"subtasks_per_instance": "1",
> 		"global_properties": [
> 			{ "name": "Partitioning", "value": "RANDOM" },
> 			{ "name": "Partitioning Order", "value": "(none)" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"local_properties": [
> 			{ "name": "Order", "value": "(none)" },
> 			{ "name": "Grouping", "value": "not grouped" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"estimates": [
> 			{ "name": "Est. Output Size", "value": "(unknown)" },
> 			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
> 		"costs": [
> 			{ "name": "Network", "value": "0.0" },
> 			{ "name": "Disk I/O", "value": "0.0" },
> 			{ "name": "CPU", "value": "0.0" },
> 			{ "name": "Cumulative Network", "value": "0.0" },
> 			{ "name": "Cumulative Disk I/O", "value": "0.0" },
> 			{ "name": "Cumulative CPU", "value": "0.0" }
> 		],
> 		"compiler_hints": [
> 			{ "name": "Output Size (bytes)", "value": "(none)" },
> 			{ "name": "Output Cardinality", "value": "(none)" },
> 			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> 			{ "name": "Filter Factor", "value": "(none)" }		]
> 	},
> 	{
> 		"id": 8,
> 		"type": "pact",
> 		"pact": "Join",
> 		"contents": "eu.stratosphere.pact.compiler.IterationsCompilerTest$Join222",
> 		"parallelism": "4",
> 		"subtasks_per_instance": "1",
> 		"predecessors": [
> 			{"id": 9, "side": "first", "ship_strategy": "Forward"},
> 			{"id": 10, "side": "second", "ship_strategy": "Hash Partition on [0]", "local_strategy":
"Sort on [0:ASC]", "temp_mode": "CACHED"}
> 		],
> 		"driver_strategy": "Merge",
> 		"global_properties": [
> 			{ "name": "Partitioning", "value": "RANDOM" },
> 			{ "name": "Partitioning Order", "value": "(none)" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"local_properties": [
> 			{ "name": "Order", "value": "(none)" },
> 			{ "name": "Grouping", "value": "not grouped" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"estimates": [
> 			{ "name": "Est. Output Size", "value": "(unknown)" },
> 			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
> 		"costs": [
> 			{ "name": "Network", "value": "(unknown)" },
> 			{ "name": "Disk I/O", "value": "(unknown)" },
> 			{ "name": "CPU", "value": "(unknown)" },
> 			{ "name": "Cumulative Network", "value": "(unknown)" },
> 			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
> 			{ "name": "Cumulative CPU", "value": "(unknown)" }
> 		],
> 		"compiler_hints": [
> 			{ "name": "Output Size (bytes)", "value": "(none)" },
> 			{ "name": "Output Cardinality", "value": "(none)" },
> 			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> 			{ "name": "Filter Factor", "value": "(none)" }		]
> 	},
> 	{
> 		"id": 7,
> 		"type": "pact",
> 		"pact": "GroupReduce",
> 		"contents": "MIN(1)",
> 		"parallelism": "4",
> 		"subtasks_per_instance": "1",
> 		"predecessors": [
> 			{"id": 8, "ship_strategy": "Forward"}
> 		],
> 		"driver_strategy": "Sorted Combine",
> 		"global_properties": [
> 			{ "name": "Partitioning", "value": "RANDOM" },
> 			{ "name": "Partitioning Order", "value": "(none)" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"local_properties": [
> 			{ "name": "Order", "value": "(none)" },
> 			{ "name": "Grouping", "value": "not grouped" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"estimates": [
> 			{ "name": "Est. Output Size", "value": "(unknown)" },
> 			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
> 		"costs": [
> 			{ "name": "Network", "value": "0.0" },
> 			{ "name": "Disk I/O", "value": "0.0" },
> 			{ "name": "CPU", "value": "0.0" },
> 			{ "name": "Cumulative Network", "value": "(unknown)" },
> 			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
> 			{ "name": "Cumulative CPU", "value": "(unknown)" }
> 		],
> 		"compiler_hints": [
> 			{ "name": "Output Size (bytes)", "value": "(none)" },
> 			{ "name": "Output Cardinality", "value": "(none)" },
> 			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> 			{ "name": "Filter Factor", "value": "(none)" }		]
> 	},
> 	{
> 		"id": 6,
> 		"type": "pact",
> 		"pact": "GroupReduce",
> 		"contents": "MIN(1)",
> 		"parallelism": "4",
> 		"subtasks_per_instance": "1",
> 		"predecessors": [
> 			{"id": 7, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort (combining)
on [0:ASC]"}
> 		],
> 		"driver_strategy": "Sorted Group Reduce",
> 		"global_properties": [
> 			{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
> 			{ "name": "Partitioned on", "value": "[0]" },
> 			{ "name": "Partitioning Order", "value": "(none)" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"local_properties": [
> 			{ "name": "Order", "value": "[0:ASC]" },
> 			{ "name": "Grouped on", "value": "[0]" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"estimates": [
> 			{ "name": "Est. Output Size", "value": "(unknown)" },
> 			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
> 		"costs": [
> 			{ "name": "Network", "value": "(unknown)" },
> 			{ "name": "Disk I/O", "value": "(unknown)" },
> 			{ "name": "CPU", "value": "(unknown)" },
> 			{ "name": "Cumulative Network", "value": "(unknown)" },
> 			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
> 			{ "name": "Cumulative CPU", "value": "(unknown)" }
> 		],
> 		"compiler_hints": [
> 			{ "name": "Output Size (bytes)", "value": "(none)" },
> 			{ "name": "Output Cardinality", "value": "(none)" },
> 			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> 			{ "name": "Filter Factor", "value": "(none)" }		]
> 	},
> 	{
> 		"id": 5,
> 		"type": "pact",
> 		"pact": "Join",
> 		"contents": "eu.stratosphere.api.java.operators.JoinOperator$DefaultJoinFunction",
> 		"parallelism": "4",
> 		"subtasks_per_instance": "1",
> 		"predecessors": [
> 			{"id": 6, "side": "first", "ship_strategy": "Forward"},
> 			{"id": 9, "side": "second", "ship_strategy": "Forward", "temp_mode": "PIPELINE_BREAKER"}
> 		],
> 		"driver_strategy": "Merge",
> 		"global_properties": [
> 			{ "name": "Partitioning", "value": "RANDOM" },
> 			{ "name": "Partitioning Order", "value": "(none)" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"local_properties": [
> 			{ "name": "Order", "value": "(none)" },
> 			{ "name": "Grouping", "value": "not grouped" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"estimates": [
> 			{ "name": "Est. Output Size", "value": "(unknown)" },
> 			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
> 		"costs": [
> 			{ "name": "Network", "value": "0.0" },
> 			{ "name": "Disk I/O", "value": "(unknown)" },
> 			{ "name": "CPU", "value": "(unknown)" },
> 			{ "name": "Cumulative Network", "value": "(unknown)" },
> 			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
> 			{ "name": "Cumulative CPU", "value": "(unknown)" }
> 		],
> 		"compiler_hints": [
> 			{ "name": "Output Size (bytes)", "value": "(none)" },
> 			{ "name": "Output Cardinality", "value": "(none)" },
> 			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> 			{ "name": "Filter Factor", "value": "(none)" }		]
> 	},
> 	{
> 		"id": 4,
> 		"type": "pact",
> 		"pact": "FlatMap",
> 		"contents": "eu.stratosphere.pact.compiler.IterationsCompilerTest$FlatMapJoin",
> 		"parallelism": "4",
> 		"subtasks_per_instance": "1",
> 		"predecessors": [
> 			{"id": 5, "ship_strategy": "Forward"}
> 		],
> 		"driver_strategy": "Map",
> 		"global_properties": [
> 			{ "name": "Partitioning", "value": "RANDOM" },
> 			{ "name": "Partitioning Order", "value": "(none)" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"local_properties": [
> 			{ "name": "Order", "value": "(none)" },
> 			{ "name": "Grouping", "value": "not grouped" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"estimates": [
> 			{ "name": "Est. Output Size", "value": "(unknown)" },
> 			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
> 		"costs": [
> 			{ "name": "Network", "value": "0.0" },
> 			{ "name": "Disk I/O", "value": "0.0" },
> 			{ "name": "CPU", "value": "0.0" },
> 			{ "name": "Cumulative Network", "value": "(unknown)" },
> 			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
> 			{ "name": "Cumulative CPU", "value": "(unknown)" }
> 		],
> 		"compiler_hints": [
> 			{ "name": "Output Size (bytes)", "value": "(none)" },
> 			{ "name": "Output Cardinality", "value": "(none)" },
> 			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> 			{ "name": "Filter Factor", "value": "(none)" }		]
> 	}
> 		],
> 		"partial_solution": 9,
> 		"next_partial_solution": 4,
> 		"id": 1,
> 		"type": "bulk_iteration",
> 		"pact": "Bulk Iteration",
> 		"contents": "Bulk Iteration",
> 		"parallelism": "4",
> 		"subtasks_per_instance": "1",
> 		"predecessors": [
> 			{"id": 2, "ship_strategy": "Forward"}
> 		],
> 		"global_properties": [
> 			{ "name": "Partitioning", "value": "RANDOM" },
> 			{ "name": "Partitioning Order", "value": "(none)" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"local_properties": [
> 			{ "name": "Order", "value": "(none)" },
> 			{ "name": "Grouping", "value": "not grouped" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"estimates": [
> 			{ "name": "Est. Output Size", "value": "(unknown)" },
> 			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
> 		"costs": [
> 			{ "name": "Network", "value": "(unknown)" },
> 			{ "name": "Disk I/O", "value": "(unknown)" },
> 			{ "name": "CPU", "value": "(unknown)" },
> 			{ "name": "Cumulative Network", "value": "(unknown)" },
> 			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
> 			{ "name": "Cumulative CPU", "value": "(unknown)" }
> 		],
> 		"compiler_hints": [
> 			{ "name": "Output Size (bytes)", "value": "(none)" },
> 			{ "name": "Output Cardinality", "value": "(none)" },
> 			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> 			{ "name": "Filter Factor", "value": "(none)" }		]
> 	},
> 	{
> 		"id": 0,
> 		"type": "sink",
> 		"pact": "Data Sink",
> 		"contents": "Print to System.out",
> 		"parallelism": "4",
> 		"subtasks_per_instance": "1",
> 		"predecessors": [
> 			{"id": 1, "ship_strategy": "Forward"}
> 		],
> 		"global_properties": [
> 			{ "name": "Partitioning", "value": "RANDOM" },
> 			{ "name": "Partitioning Order", "value": "(none)" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"local_properties": [
> 			{ "name": "Order", "value": "(none)" },
> 			{ "name": "Grouping", "value": "not grouped" },
> 			{ "name": "Uniqueness", "value": "not unique" }
> 		],
> 		"estimates": [
> 			{ "name": "Est. Output Size", "value": "(unknown)" },
> 			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
> 		"costs": [
> 			{ "name": "Network", "value": "0.0" },
> 			{ "name": "Disk I/O", "value": "0.0" },
> 			{ "name": "CPU", "value": "0.0" },
> 			{ "name": "Cumulative Network", "value": "(unknown)" },
> 			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
> 			{ "name": "Cumulative CPU", "value": "(unknown)" }
> 		],
> 		"compiler_hints": [
> 			{ "name": "Output Size (bytes)", "value": "(none)" },
> 			{ "name": "Output Cardinality", "value": "(none)" },
> 			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> 			{ "name": "Filter Factor", "value": "(none)" }		]
> 	}
> 	]
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message