spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Reynold Xin (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (SPARK-19846) Add a flag to disable constraint propagation
Date Fri, 24 Mar 2017 23:05:41 GMT

     [ https://issues.apache.org/jira/browse/SPARK-19846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Reynold Xin resolved SPARK-19846.
---------------------------------
       Resolution: Fixed
         Assignee: Liang-Chi Hsieh
    Fix Version/s: 2.2.0

> Add a flag to disable constraint propagation
> --------------------------------------------
>
>                 Key: SPARK-19846
>                 URL: https://issues.apache.org/jira/browse/SPARK-19846
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: Liang-Chi Hsieh
>            Assignee: Liang-Chi Hsieh
>             Fix For: 2.2.0
>
>
> Constraint propagation can be computation expensive and block the driver execution for
long time. For example, the below benchmark needs 30mins.
> Compared with other attempts to modify how constraints propagation works, this is a much
simpler option: add a flag to disable constraint propagation.
> {code}
>     import org.apache.spark.ml.{Pipeline, PipelineStage}
>     import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
>     spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)
>     val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df,
i) => df.withColumn(s"x$i", $"x0"))
>     val indexers = df.columns.tail.map(c => new StringIndexer()
>       .setInputCol(c)
>       .setOutputCol(s"${c}_indexed")
>       .setHandleInvalid("skip"))
>     val encoders = indexers.map(indexer => new OneHotEncoder()
>       .setInputCol(indexer.getOutputCol)
>       .setOutputCol(s"${indexer.getOutputCol}_encoded")
>       .setDropLast(true))
>     val stages: Array[PipelineStage] = indexers ++ encoders
>     val pipeline = new Pipeline().setStages(stages)
>     val startTime = System.nanoTime
>     pipeline.fit(df).transform(df).show
>     val runningTime = System.nanoTime - startTime
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message