spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cloud-fan <...@git.apache.org>
Subject [GitHub] spark pull request #18540: [SPARK-19451][SQL] rangeBetween method should acc...
Date Fri, 28 Jul 2017 12:01:04 GMT
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18540#discussion_r130073540
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
---
    @@ -106,161 +113,176 @@ case class WindowSpecReference(name: String) extends WindowSpec
     /**
      * The trait used to represent the type of a Window Frame.
      */
    -sealed trait FrameType
    +sealed trait FrameType {
    +  def inputType: AbstractDataType
    +  def sql: String
    +}
     
     /**
    - * RowFrame treats rows in a partition individually. When a [[ValuePreceding]]
    - * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is considered
    - * as a physical offset.
    + * RowFrame treats rows in a partition individually. Values used in a row frame are considered
    + * to be physical offsets.
      * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 3-row frame,
      * from the row that precedes the current row to the row that follows the current row.
      */
    -case object RowFrame extends FrameType
    +case object RowFrame extends FrameType {
    +  override def inputType: AbstractDataType = IntegerType
    +  override def sql: String = "ROWS"
    +}
     
     /**
    - * RangeFrame treats rows in a partition as groups of peers.
    - * All rows having the same `ORDER BY` ordering are considered as peers.
    - * When a [[ValuePreceding]] or a [[ValueFollowing]] is used as its [[FrameBoundary]],
    - * the value is considered as a logical offset.
    + * RangeFrame treats rows in a partition as groups of peers. All rows having the same
`ORDER BY`
    + * ordering are considered as peers. Values used in a range frame are considered to be
logical
    + * offsets.
      * For example, assuming the value of the current row's `ORDER BY` expression `expr`
is `v`,
      * `RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a frame containing rows whose
values
      * `expr` are in the range of [v-1, v+1].
      *
      * If `ORDER BY` clause is not defined, all rows in the partition are considered as peers
      * of the current row.
      */
    -case object RangeFrame extends FrameType
    -
    -/**
    - * The trait used to represent the type of a Window Frame Boundary.
    - */
    -sealed trait FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean
    +case object RangeFrame extends FrameType {
    +  override def inputType: AbstractDataType = TypeCollection.NumericAndInterval
    +  override def sql: String = "RANGE"
     }
     
     /**
    - * Extractor for making working with frame boundaries easier.
    + * The trait used to represent special boundaries used in a window frame.
      */
    -object FrameBoundary {
    -  def apply(boundary: FrameBoundary): Option[Int] = unapply(boundary)
    -  def unapply(boundary: FrameBoundary): Option[Int] = boundary match {
    -    case CurrentRow => Some(0)
    -    case ValuePreceding(offset) => Some(-offset)
    -    case ValueFollowing(offset) => Some(offset)
    -    case _ => None
    -  }
    +sealed trait SpecialFrameBoundary extends Expression with Unevaluable {
    +  override lazy val children: Seq[Expression] = Nil
    +  override def dataType: DataType = NullType
    +  override def foldable: Boolean = false
    +  override def nullable: Boolean = false
    +
    +  def notFollows(other: Expression): Boolean
     }
     
    -/** UNBOUNDED PRECEDING boundary. */
    -case object UnboundedPreceding extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => true
    -    case vp: ValuePreceding => true
    -    case CurrentRow => true
    -    case vf: ValueFollowing => true
    -    case UnboundedFollowing => true
    -  }
    +/** UNBOUNDED boundary. */
    +case object UnboundedPreceding extends SpecialFrameBoundary {
    +  override def sql: String = "UNBOUNDED PRECEDING"
     
    -  override def toString: String = "UNBOUNDED PRECEDING"
    +  override def notFollows(other: Expression): Boolean = true
     }
     
    -/** <value> PRECEDING boundary. */
    -case class ValuePreceding(value: Int) extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => false
    -    case ValuePreceding(anotherValue) => value >= anotherValue
    -    case CurrentRow => true
    -    case vf: ValueFollowing => true
    +case object UnboundedFollowing extends SpecialFrameBoundary {
    +  override def sql: String = "UNBOUNDED FOLLOWING"
    +
    +  override def notFollows(other: Expression): Boolean = other match {
         case UnboundedFollowing => true
    +    case _ => false
       }
    -
    -  override def toString: String = s"$value PRECEDING"
     }
     
     /** CURRENT ROW boundary. */
    -case object CurrentRow extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => false
    -    case vp: ValuePreceding => false
    -    case CurrentRow => true
    -    case vf: ValueFollowing => true
    -    case UnboundedFollowing => true
    -  }
    -
    -  override def toString: String = "CURRENT ROW"
    -}
    +case object CurrentRow extends SpecialFrameBoundary {
    +  override def sql: String = "CURRENT ROW"
     
    -/** <value> FOLLOWING boundary. */
    -case class ValueFollowing(value: Int) extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => false
    -    case vp: ValuePreceding => false
    -    case CurrentRow => false
    -    case ValueFollowing(anotherValue) => value <= anotherValue
    +  override def notFollows(other: Expression): Boolean = other match {
         case UnboundedFollowing => true
    +    case _ => false
       }
    -
    -  override def toString: String = s"$value FOLLOWING"
    -}
    -
    -/** UNBOUNDED FOLLOWING boundary. */
    -case object UnboundedFollowing extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => false
    -    case vp: ValuePreceding => false
    -    case CurrentRow => false
    -    case vf: ValueFollowing => false
    -    case UnboundedFollowing => true
    -  }
    -
    -  override def toString: String = "UNBOUNDED FOLLOWING"
     }
     
     /**
      * Represents a window frame.
      */
    -sealed trait WindowFrame
    +sealed trait WindowFrame extends Expression with Unevaluable {
    +  override lazy val children: Seq[Expression] = Nil
    +  override def dataType: DataType = throw new UnsupportedOperationException("dataType")
    +  override def foldable: Boolean = false
    +  override def nullable: Boolean = false
    +}
     
     /** Used as a placeholder when a frame specification is not defined. */
     case object UnspecifiedFrame extends WindowFrame
     
    -/** A specified Window Frame. */
    +/**
    + * A specified Window Frame. The val lower/uppper can be either a foldable [[Expression]]
or a
    + * [[SpecialFrameBoundary]].
    + */
     case class SpecifiedWindowFrame(
         frameType: FrameType,
    -    frameStart: FrameBoundary,
    -    frameEnd: FrameBoundary) extends WindowFrame {
    -
    -  /** If this WindowFrame is valid or not. */
    -  def validate: Option[String] = (frameType, frameStart, frameEnd) match {
    -    case (_, UnboundedFollowing, _) =>
    -      Some(s"$UnboundedFollowing is not allowed as the start of a Window Frame.")
    -    case (_, _, UnboundedPreceding) =>
    -      Some(s"$UnboundedPreceding is not allowed as the end of a Window Frame.")
    -    // case (RowFrame, start, end) => ??? RowFrame specific rule
    -    // case (RangeFrame, start, end) => ??? RangeFrame specific rule
    -    case (_, start, end) =>
    -      if (start.notFollows(end)) {
    -        None
    -      } else {
    -        val reason =
    -          s"The end of this Window Frame $end is smaller than the start of " +
    -          s"this Window Frame $start."
    -        Some(reason)
    -      }
    +    lower: Expression,
    +    upper: Expression)
    +  extends WindowFrame {
    +
    +  override lazy val children: Seq[Expression] = lower :: upper :: Nil
    +
    +  lazy val valueBoundary: Seq[Expression] =
    +    children.filterNot(_.isInstanceOf[SpecialFrameBoundary])
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    // Check lower value.
    +    val lowerCheck = checkBoundary(lower, "lower")
    +    if (lowerCheck.isFailure) {
    +      return lowerCheck
    +    }
    +
    +    // Check upper value.
    +    val upperCheck = checkBoundary(upper, "upper")
    +    if (upperCheck.isFailure) {
    +      return upperCheck
    +    }
    +
    +    // Check combination (of expressions).
    +    (lower, upper) match {
    +      case (l: SpecialFrameBoundary, _) => TypeCheckSuccess
    +      case (_, u: SpecialFrameBoundary) => TypeCheckSuccess
    +      case (l: Expression, u: Expression) if l.dataType != u.dataType =>
    +        TypeCheckFailure(
    +          s"Window frame bounds '$lower' and '$upper' do no not have the same data type:
" +
    +            s"'${l.dataType.catalogString}' <> '${u.dataType.catalogString}'")
    +      case (l: Expression, u: Expression) if isGreaterThan(l, u) =>
    +        TypeCheckFailure(
    +          "The lower bound of a window frame must less than or equal to the upper bound")
    +      case _ => TypeCheckSuccess
    +    }
       }
     
    -  override def toString: String = frameType match {
    -    case RowFrame => s"ROWS BETWEEN $frameStart AND $frameEnd"
    -    case RangeFrame => s"RANGE BETWEEN $frameStart AND $frameEnd"
    +  override def sql: String = {
    +    val lowerSql = boundarySql(lower)
    +    val upperSql = boundarySql(upper)
    +    s"${frameType.sql} BETWEEN $lowerSql AND $upperSql"
    +  }
    +
    +  def isUnbounded: Boolean = lower == UnboundedPreceding && upper == UnboundedFollowing
    +
    +  def isValueBound: Boolean = valueBoundary.nonEmpty
    +
    +  def isOffset: Boolean = (lower, upper) match {
    +    case (l: Expression, u: Expression) => frameType == RowFrame && l == u
    +    case _ => false
    +  }
    +
    +  private def boundarySql(expr: Expression): String = expr match {
    +    case e: SpecialFrameBoundary => e.sql
    +    case UnaryMinus(n) => n.sql + " PRECEDING"
    +    case e: Expression => e.sql + " FOLLOWING"
    +  }
    +
    +  private def isGreaterThan(l: Expression, r: Expression): Boolean = {
    +    GreaterThan(l, r).eval().asInstanceOf[Boolean]
    +  }
    +
    +  private def checkBoundary(b: Expression, location: String): TypeCheckResult = b match
{
    --- End diff --
    
    nit:
    ```
    b match {
      case _: SpecialFrameBoundary => success
      case _ if !b.foldable => fail
      case _ if !frameType.inputType.acceptsType(e.dataType) => fail
      case _ => success
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message