spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From WeichenXu123 <...@git.apache.org>
Subject [GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Date Fri, 16 Mar 2018 04:14:54 GMT
Github user WeichenXu123 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20829#discussion_r174991898
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
    @@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val
uid: String)
       @Since("1.4.0")
       def setOutputCol(value: String): this.type = set(outputCol, value)
     
    +  /** @group setParam */
    +  @Since("1.6.0")
    +  def setHandleInvalid(value: String): this.type = set(handleInvalid, value)
    +
    +  /**
    +   * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out
rows with
    +   * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN
in the
    +   * output).
    +   * Default: "error"
    +   * @group param
    +   */
    +  @Since("1.6.0")
    +  override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid",
    +    "Hhow to handle invalid data (NULL values). Options are 'skip' (filter out rows with
" +
    +      "invalid data), 'error' (throw an error), or 'keep' (return relevant number of
NaN " +
    +      "in the * output).", ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
    +
    +  setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)
    +
       @Since("2.0.0")
       override def transform(dataset: Dataset[_]): DataFrame = {
         transformSchema(dataset.schema, logging = true)
         // Schema transformation.
         val schema = dataset.schema
    -    lazy val first = dataset.toDF.first()
    -    val attrs = $(inputCols).flatMap { c =>
    +
    +    val featureAttributesMap: Seq[Seq[Attribute]] = $(inputCols).toSeq.map { c =>
           val field = schema(c)
    -      val index = schema.fieldIndex(c)
           field.dataType match {
    -        case DoubleType =>
    -          val attr = Attribute.fromStructField(field)
    -          // If the input column doesn't have ML attribute, assume numeric.
    -          if (attr == UnresolvedAttribute) {
    -            Some(NumericAttribute.defaultAttr.withName(c))
    -          } else {
    -            Some(attr.withName(c))
    -          }
    -        case _: NumericType | BooleanType =>
    -          // If the input column type is a compatible scalar type, assume numeric.
    -          Some(NumericAttribute.defaultAttr.withName(c))
             case _: VectorUDT =>
    -          val group = AttributeGroup.fromStructField(field)
    -          if (group.attributes.isDefined) {
    -            // If attributes are defined, copy them with updated names.
    -            group.attributes.get.zipWithIndex.map { case (attr, i) =>
    +          val attributeGroup = AttributeGroup.fromStructField(field)
    +          var length = attributeGroup.size
    +          val isMissingNumAttrs = -1 == length
    +          if (isMissingNumAttrs && dataset.isStreaming) {
    +            // this condition is checked for every column, but should be cheap
    +            throw new RuntimeException(
    +              s"""
    +                 |VectorAssembler cannot dynamically determine the size of vectors for
streaming
    +                 |data. Consider applying VectorSizeHint to ${c} so that this transformer
can be
    +                 |used to transform streaming inputs.
    +           """.stripMargin.replaceAll("\n", " "))
    +          }
    +          if (isMissingNumAttrs) {
    +            val column = dataset.select(c).na.drop()
    +            // column count is a spark job for every column missing num attrs
    +            length = (column.count() > 0, $(handleInvalid)) match {
    +              // column first is the second spark job for every column missing num attrs
    +              case (true, _) => column.first.getAs[Vector](0).size
    +              case (false, VectorAssembler.SKIP_INVALID | VectorAssembler.ERROR_INVALID)
=> 0
    +              case (false, _) =>
    +                throw new RuntimeException(
    +                  s"""
    +                     |VectorAssembler cannot determine the size of empty vectors. Consider
applying
    +                     |VectorSizeHint to ${c} so that this transformer can be used to
transform empty
    +                     |columns.
    +               """.stripMargin.replaceAll("\n", " "))
    --- End diff --
    
    I think in this case, `VectorSizeHint` also cannot help to providing the vector size.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message