flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kaibo Zhou (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6955) Add operation log for Table
Date Tue, 20 Jun 2017 15:36:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16055958#comment-16055958
] 

Kaibo Zhou commented on FLINK-6955:
-----------------------------------

[~twalthr], thanks for your comments.

The result of  {{TableEnvironment#explain}}  is difficult to correspond to the actual expression
written by the user, especially for some complex jobs.

eg:

{code}
    val stream = env.fromCollection(data)
    val table = stream.toTable(tEnv, 'pk, 'a)

    val leftTable = table
      .select('pk as 'leftPk, 'a as 'leftA)
      .where('leftA < 3)
    val rightTable = table
      .select('pk as 'rightPk, 'a as 'rightA)
      .where('rightA > 3)
    val rightTableWithPk = rightTable
      .groupBy('rightPk)
      .select('rightPk, 'rightA.max as 'rightA)

    val resultTable = rightTableWithPk
      .join(leftTable)
      .where('leftPk === 'rightPk)
      .groupBy('leftPk)
      .select('leftPk, 'leftA.count)
    resultTable.toDataSet[Row]
{code}

The explained result:
{code}
== Abstract Syntax Tree ==
LogicalProject(leftPk=[$0], TMP_1=[$1])
  LogicalAggregate(group=[{0}], TMP_1=[COUNT($1)])
    LogicalProject(leftPk=[$2], leftA=[$3])
      LogicalFilter(condition=[=($2, $0)])
        LogicalJoin(condition=[true], joinType=[inner])
          LogicalProject(rightPk=[$0], rightA=[AS($1, 'rightA')])
            LogicalAggregate(group=[{0}], TMP_0=[MAX($1)])
              LogicalProject(rightPk=[$0], rightA=[$1])
                LogicalFilter(condition=[>($1, 3)])
                  LogicalProject(rightPk=[AS($0, 'rightPk')], rightA=[AS($1, 'rightA')])
                    LogicalTableScan(table=[[_DataSetTable_0]])
          LogicalFilter(condition=[<($1, 3)])
            LogicalProject(leftPk=[AS($0, 'leftPk')], leftA=[AS($1, 'leftA')])
              LogicalTableScan(table=[[_DataSetTable_0]])

== Optimized Logical Plan ==
DataSetCalc(select=[leftPk, TMP_1])
  DataSetJoin(where=[=(leftPk, rightPk)], join=[rightPk, leftPk, TMP_1], joinType=[InnerJoin])
    DataSetCalc(select=[rightPk])
      DataSetAggregate(groupBy=[rightPk], select=[rightPk, MAX(rightA) AS TMP_0])
        DataSetCalc(select=[pk AS rightPk, a AS rightA], where=[>(a, 3)])
          DataSetScan(table=[[_DataSetTable_0]])
    DataSetAggregate(groupBy=[leftPk], select=[leftPk, COUNT(leftA) AS TMP_1])
      DataSetCalc(select=[pk AS leftPk, a AS leftA], where=[<(a, 3)])
        DataSetScan(table=[[_DataSetTable_0]])

== Physical Execution Plan ==
Stage 8 : Data Source
	content : collect elements with CollectionInputFormat
	Partitioning : RANDOM_PARTITIONED

	Stage 7 : Map
		content : from: (pk, a)
		ship_strategy : Forward
		exchange_mode : BATCH
		driver_strategy : Map
		Partitioning : RANDOM_PARTITIONED

		Stage 6 : FlatMap
			content : where: (>(a, 3)), select: (pk AS rightPk, a AS rightA)
			ship_strategy : Forward
			exchange_mode : PIPELINED
			driver_strategy : FlatMap
			Partitioning : RANDOM_PARTITIONED

			Stage 5 : GroupCombine
				content : groupBy: (rightPk), select: (rightPk, MAX(rightA) AS TMP_0)
				ship_strategy : Forward
				exchange_mode : PIPELINED
				driver_strategy : Sorted Combine
				Partitioning : RANDOM_PARTITIONED

				Stage 4 : GroupReduce
					content : groupBy: (rightPk), select: (rightPk, MAX(rightA) AS TMP_0)
					ship_strategy : Hash Partition on [0]
					exchange_mode : PIPELINED
					driver_strategy : Sorted Group Reduce
					Partitioning : RANDOM_PARTITIONED

					Stage 3 : FlatMap
						content : select: (rightPk)
						ship_strategy : Forward
						exchange_mode : PIPELINED
						driver_strategy : FlatMap
						Partitioning : RANDOM_PARTITIONED

						Stage 12 : Map
							content : from: (pk, a)
							ship_strategy : Forward
							exchange_mode : BATCH
							driver_strategy : Map
							Partitioning : RANDOM_PARTITIONED

							Stage 11 : FlatMap
								content : where: (<(a, 3)), select: (pk AS leftPk, a AS leftA)
								ship_strategy : Forward
								exchange_mode : PIPELINED
								driver_strategy : FlatMap
								Partitioning : RANDOM_PARTITIONED

								Stage 10 : GroupCombine
									content : groupBy: (leftPk), select: (leftPk, COUNT(leftA) AS TMP_1)
									ship_strategy : Forward
									exchange_mode : PIPELINED
									driver_strategy : Sorted Combine
									Partitioning : RANDOM_PARTITIONED

									Stage 9 : GroupReduce
										content : groupBy: (leftPk), select: (leftPk, COUNT(leftA) AS TMP_1)
										ship_strategy : Hash Partition on [0]
										exchange_mode : PIPELINED
										driver_strategy : Sorted Group Reduce
										Partitioning : RANDOM_PARTITIONED

										Stage 2 : Join
											content : where: (=(leftPk, rightPk)), join: (rightPk, leftPk, TMP_1)
											ship_strategy : Hash Partition on [0]
											exchange_mode : PIPELINED
											driver_strategy : Hybrid Hash (build: select: (rightPk) (id: 3))
											Partitioning : RANDOM_PARTITIONED

											Stage 1 : FlatMap
												content : select: (leftPk, TMP_1)
												ship_strategy : Forward
												exchange_mode : PIPELINED
												driver_strategy : FlatMap
												Partitioning : RANDOM_PARTITIONED

												Stage 0 : Data Sink
													content : org.apache.flink.api.java.io.DiscardingOutputFormat
													ship_strategy : Forward
													exchange_mode : PIPELINED
													Partitioning : RANDOM_PARTITIONED
{code}

If the number of nodes reaches tens of hundreds, which is very common in the production environment,
it will takes a long time to read what the user writes.

If we can print operation log, the result may be:

{code}
UnnamedTable$2 = UnnamedTable$0.select('pk as 'leftPk, 'a as 'leftA)
  .where('leftA < 3)
UnnamedTable$8 = UnnamedTable$0.select('pk as 'rightPk, 'a as 'rightA)
  .where('rightA > 3)
  .groupBy('rightPk)
  .select('rightPk, max('rightA) as 'rightA)
  .join(UnnamedTable$2)
  .where('leftPk === 'rightPk)
  .groupBy('leftPk)
  .select('leftPk, count('leftA))
{code}

So that users can see at a glance to know what they wrote.

> Add operation log for Table
> ---------------------------
>
>                 Key: FLINK-6955
>                 URL: https://issues.apache.org/jira/browse/FLINK-6955
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Kaibo Zhou
>            Assignee: Kaibo Zhou
>
> In some of the actual production scenarios, the operation of the Table is very complicated,
will go through a number of steps. For example, the Table object will be generated at the
beginning of the program, in the process of running will be passed to different modules, each
module will do some operations for the Table, such as union, join or filter. At the end of
the program will call writeToSink or other operations.
> Hoping to record the operation about Table and can print out.
> eg:
> {code}
>     val table1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
>     val table2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c,
'e)
>     val unionDs = table1.unionAll(table2.select('a, 'b, 'c)).filter('b < 2).select('c)
>     val results = unionDs.toDataStream[Row]
>     
>     val result = tEnv.getLog
>     val expected =
>       "UnnamedTable$1 = UnnamedTable$0.select('a, 'b, 'c)\n" +
>         "UnnamedTable$5 = UnnamedTable$2.unionAll(UnnamedTable$1)\n" +
>         "  .filter('b < 2)\n" +
>         "  .select('c)\n"
>     assertEquals(expected, result)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message