flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Asterios Katsifodimos (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-1100) Optimization oportunity missed
Date Thu, 18 Sep 2014 13:58:34 GMT
Asterios Katsifodimos created FLINK-1100:
--------------------------------------------

             Summary: Optimization oportunity missed
                 Key: FLINK-1100
                 URL: https://issues.apache.org/jira/browse/FLINK-1100
             Project: Flink
          Issue Type: Improvement
          Components: Optimizer
    Affects Versions: 0.6-incubating, 0.7-incubating
            Reporter: Asterios Katsifodimos
            Priority: Minor


The Optimizer does not see an optimization opportunity. 

The program I used is the transitive closure of v0.7-incubation and replaced the groupBy.reduce
with a simple distinct. 

The resulting plan (JSON here: https://gist.github.com/asteriosk/7a04cfd19537395eb401, also
in the end of the bug, misses an optimization opportunity: the sorted groupReduce can receive
an input partitioned on field 0 and sort on 1 in order to apply the distinct function. As
a result, the partitioning (on 0) can be reused to forward the results to the input of the
next iteration instead of repartitioning.

{code:javascript}
{
	"nodes": [
 
	{
		"id": 2,
		"type": "source",
		"pact": "Data Source",
		"contents": "[(1, 2), (2, 3), (2, 4), (3, 5), (6, 7), (8, 9), (8, 10), (5, 11), (11, 12),
(10, 13), (9, 14), (13,",
		"parallelism": "1",
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"costs": [
			{ "name": "Network", "value": "0.0" },
			{ "name": "Disk I/O", "value": "0.0" },
			{ "name": "CPU", "value": "0.0" },
			{ "name": "Cumulative Network", "value": "0.0" },
			{ "name": "Cumulative Disk I/O", "value": "0.0" },
			{ "name": "Cumulative CPU", "value": "0.0" }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"step_function": [
	{
		"id": 6,
		"type": "pact",
		"pact": "Bulk Partial Solution",
		"contents": "Partial Solution",
		"parallelism": "4",
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "[1:ASC]" },
			{ "name": "Grouped on", "value": "[1]" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"costs": [
			{ "name": "Network", "value": "0.0" },
			{ "name": "Disk I/O", "value": "0.0" },
			{ "name": "CPU", "value": "0.0" },
			{ "name": "Cumulative Network", "value": "0.0" },
			{ "name": "Cumulative Disk I/O", "value": "0.0" },
			{ "name": "Cumulative CPU", "value": "0.0" }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"id": 5,
		"type": "pact",
		"pact": "Join",
		"contents": "org.apache.flink.api.java.operators.JoinOperator$DefaultJoin$WrappingFlatJoinFunction",
		"parallelism": "4",
		"predecessors": [
			{"id": 6, "side": "first", "ship_strategy": "Hash Partition on [1]"},
			{"id": 2, "side": "second", "ship_strategy": "Hash Partition on [0]"}
		],
		"driver_strategy": "Hybrid Hash (CACHED) (build: [(1, 2), (2, 3), (2, 4), (3, 5), (6, 7),
(8, 9), (8, 10), (5, 11), (11, 12), (10, 13), (9, 14), (13,)",
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"costs": [
			{ "name": "Network", "value": "(unknown)" },
			{ "name": "Disk I/O", "value": "(unknown)" },
			{ "name": "CPU", "value": "(unknown)" },
			{ "name": "Cumulative Network", "value": "(unknown)" },
			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
			{ "name": "Cumulative CPU", "value": "(unknown)" }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"id": 4,
		"type": "pact",
		"pact": "Union",
		"contents": "",
		"parallelism": "4",
		"predecessors": [
			{"id": 5, "side": "first", "ship_strategy": "Hash Partition on [0, 1]"},
			{"id": 6, "side": "second", "ship_strategy": "Hash Partition on [0, 1]"}
		],
		"global_properties": [
			{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
			{ "name": "Partitioned on", "value": "[0, 1]" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"id": 3,
		"type": "pact",
		"pact": "GroupReduce",
		"contents": "org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction",
		"parallelism": "4",
		"predecessors": [
			{"id": 5, "ship_strategy": "Hash Partition on [0, 1]"},
			{"id": 6, "ship_strategy": "Hash Partition on [0, 1]"}
		],
		"driver_strategy": "Sorted Group Reduce",
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"costs": [
			{ "name": "Network", "value": "0.0" },
			{ "name": "Disk I/O", "value": "(unknown)" },
			{ "name": "CPU", "value": "(unknown)" },
			{ "name": "Cumulative Network", "value": "(unknown)" },
			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
			{ "name": "Cumulative CPU", "value": "(unknown)" }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	}
		],
		"partial_solution": 6,
		"next_partial_solution": 3,
		"id": 1,
		"type": "bulk_iteration",
		"pact": "Bulk Iteration",
		"contents": "Bulk Iteration",
		"parallelism": "4",
		"predecessors": [
			{"id": 2, "ship_strategy": "Redistribute", "local_strategy": "Sort on [1:ASC]"}
		],
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"costs": [
			{ "name": "Network", "value": "(unknown)" },
			{ "name": "Disk I/O", "value": "(unknown)" },
			{ "name": "CPU", "value": "(unknown)" },
			{ "name": "Cumulative Network", "value": "(unknown)" },
			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
			{ "name": "Cumulative CPU", "value": "(unknown)" }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"id": 0,
		"type": "sink",
		"pact": "Data Sink",
		"contents": "Print to System.out",
		"parallelism": "4",
		"predecessors": [
			{"id": 1, "ship_strategy": "Forward"}
		],
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"costs": [
			{ "name": "Network", "value": "0.0" },
			{ "name": "Disk I/O", "value": "0.0" },
			{ "name": "CPU", "value": "0.0" },
			{ "name": "Cumulative Network", "value": "(unknown)" },
			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
			{ "name": "Cumulative CPU", "value": "(unknown)" }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	}
	]
}
{code}








--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message