spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From parana <joao.par...@gmail.com>
Subject How to change the attributes order in Apache SparkSQL `Project` operator ?
Date Mon, 05 Feb 2018 00:26:31 GMT
See below my queryExecution.optimizedPlan before apply my Rule.01 Project
[x#9, p#10, q#11, if (isnull(q#11)) null else UDF(q#11) AS udfB_10#28, if
(isnull(p#10)) null else UDF(p#10) AS udfA_99#93]02 +- InMemoryRelation
[x#9, p#10, q#11], true, 10000, StorageLevel(disk, memory, deserialized, 1
replicas)03    :  +- *SerializeFromObject [assertnotnull(input[0, eic.R0,
true], top level non-flat input object).x AS x#9, unwrapoption(IntegerType,
assertnotnull(input[0, eic.R0, true], top level non-flat input object).p) AS
p#10, unwrapoption(IntegerType, assertnotnull(input[0, eic.R0, true], top
level non-flat input object).q) AS q#11]04    :     +- *MapElements , obj#8:
eic.R005    :        +- *DeserializeToObject newInstance(class
java.lang.Long), obj#7: java.lang.Long05    :           +- *Range (0, 3,
step=1, splits=Some(2))In line 01 I need swap the position of udfA and udfB
this way:01 Project [x#9, p#10, q#11, if (isnull(p#10)) null else UDF(p#10)
AS udfA_99#93, if (isnull(q#11)) null else UDF(q#11) AS udfB_10#28]when I
try to change the order of the attributes in a Projection operation in
SparkSQL via Catalyst optimization the result of the query is modified to an
invalid value. Maybe I'm not doing everything is needed. I'm just changing
the order of NamedExpression objects in fields parameter:object
ReorderColumnsOnProjectOptimizationRule extends Rule[LogicalPlan] {  def
apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {    case
Project(fields: Seq[NamedExpression], child) =>       if
(checkCondition(fields)) Project(newFieldsObject(fields), child) else
Project(fields, child)    case _ => plan  }  private def
newFieldsObject(fields: Seq[NamedExpression]): Seq[NamedExpression] = {   
// compare UDFs computation cost and return the new NamedExpression list   
. . .  }  private def checkCondition(fields: Seq[NamedExpression]): Boolean
= {    // compare UDFs computation cost and return Boolean for decision off
change order on field list.    . . .   }  . . .}Note: I'm adding my Rule on
extraOptimizations SparkSQL object:spark.experimental.extraOptimizations =
Seq(ReorderColumnsOnProjectOptimizationRule)Any suggestions will be of great
help.



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
Mime
View raw message