spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rxin <...@git.apache.org>
Subject [GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Date Wed, 23 Nov 2016 05:31:47 GMT
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15959#discussion_r89258401
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
---
    @@ -58,60 +61,170 @@ case class Statistics(
       }
     }
     
    +
     /**
    - * Statistics for a column.
    + * Statistics collected for a column.
    + *
    + * 1. Supported data types are defined in `ColumnStat.supportsType`.
    + * 2. The JVM data type stored in min/max is the external data type (used in Row) for
the
    + * corresponding Catalyst data type. For example, for DateType we store java.sql.Date,
and for
    + * TimestampType we store java.sql.Timestamp.
    + * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs.
    + * 4. There is no guarantee that the statistics collected are accurate. Approximation
algorithms
    + *    (sketches) might have been used, and the data collected can also be stale.
    + *
    + * @param distinctCount number of distinct values
    + * @param min minimum value
    + * @param max maximum value
    + * @param nullCount number of nulls
    + * @param avgLen average length of the values. For fixed-length types, this should be
a constant.
    + * @param maxLen maximum length of the values. For fixed-length types, this should be
a constant.
      */
    -case class ColumnStat(statRow: InternalRow) {
    +case class ColumnStat(
    +    distinctCount: BigInt,
    +    min: Option[Any],
    +    max: Option[Any],
    +    nullCount: BigInt,
    +    avgLen: Long,
    +    maxLen: Long) {
     
    -  def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = {
    -    NumericColumnStat(statRow, dataType)
    -  }
    -  def forString: StringColumnStat = StringColumnStat(statRow)
    -  def forBinary: BinaryColumnStat = BinaryColumnStat(statRow)
    -  def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow)
    +  /**
    +   * Returns a map from string to string that can be used to serialize the column stats.
    +   * The key is the name of the field (e.g. "ndv" or "min"), and the value is the string
    +   * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]].
    +   *
    +   * As part of the protocol, the returned map always contains a key called "version".
    +   * In the case min/max values are null (None), they will be stored as "<null>".
    +   */
    +  def toMap: Map[String, String] = Map(
    +    ColumnStat.KEY_VERSION -> "1",
    +    ColumnStat.KEY_DISTINCT_COUNT -> distinctCount.toString,
    +    ColumnStat.KEY_MIN_VALUE -> min.map(_.toString).getOrElse(ColumnStat.NULL_STRING),
    +    ColumnStat.KEY_MAX_VALUE -> max.map(_.toString).getOrElse(ColumnStat.NULL_STRING),
    +    ColumnStat.KEY_NULL_COUNT -> nullCount.toString,
    +    ColumnStat.KEY_AVG_LEN -> avgLen.toString,
    +    ColumnStat.KEY_MAX_LEN -> maxLen.toString
    +  )
    +}
    +
    +
    +object ColumnStat extends Logging {
    +
    +  /** String representation for null in serialization. */
    +  private val NULL_STRING: String = "<null>"
    +
    +  // List of string keys used to serialize ColumnStat
    +  val KEY_VERSION = "version"
    +  private val KEY_DISTINCT_COUNT = "distinctCount"
    +  private val KEY_MIN_VALUE = "min"
    +  private val KEY_MAX_VALUE = "max"
    +  private val KEY_NULL_COUNT = "nullCount"
    +  private val KEY_AVG_LEN = "avgLen"
    +  private val KEY_MAX_LEN = "maxLen"
     
    -  override def toString: String = {
    -    // use Base64 for encoding
    -    Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes)
    +  /** Returns true iff the we support gathering column statistics on column of the given
type. */
    +  def supportsType(dataType: DataType): Boolean = dataType match {
    +    case _: IntegralType => true
    +    case _: DecimalType => true
    +    case DoubleType | FloatType => true
    +    case BooleanType => true
    +    case DateType => true
    +    case TimestampType => true
    +    case BinaryType | StringType => true
    +    case _ => false
       }
    -}
     
    -object ColumnStat {
    -  def apply(numFields: Int, str: String): ColumnStat = {
    -    // use Base64 for decoding
    -    val bytes = Base64.decodeBase64(str)
    -    val unsafeRow = new UnsafeRow(numFields)
    -    unsafeRow.pointTo(bytes, bytes.length)
    -    ColumnStat(unsafeRow)
    +  /**
    +   * Creates a [[ColumnStat]] object from the given map. This is used to deserialize
column stats
    +   * from some external storage. The serialization side is defined in [[ColumnStat.toMap]].
    +   */
    +  def fromMap(table: String, field: StructField, map: Map[String, String])
    +    : Option[ColumnStat] = {
    +    val str2val: (String => Any) = field.dataType match {
    +      case _: IntegralType => _.toLong
    +      case _: DecimalType => new java.math.BigDecimal(_)
    +      case DoubleType | FloatType => _.toDouble
    +      case BooleanType => _.toBoolean
    +      case DateType => java.sql.Date.valueOf
    +      case TimestampType => java.sql.Timestamp.valueOf
    +      case BinaryType | StringType => (v: String) => if (v == NULL_STRING) null
else v
    --- End diff --
    
    I updated it to be more future proof - so even if we serialize bytes using future version
of spark, using the current version won't crash the system.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message