spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From HyukjinKwon <...@git.apache.org>
Subject [GitHub] spark pull request #15072: [SPARK-17123][SQL] Use type-widen encoder for Dat...
Date Tue, 13 Sep 2016 05:54:45 GMT
GitHub user HyukjinKwon opened a pull request:

    https://github.com/apache/spark/pull/15072

    [SPARK-17123][SQL] Use type-widen encoder for DataFrame rather than existing encoder to
allow type-widening from set operations

    ## What changes were proposed in this pull request?
    
    This PR fixes set operations in `DataFrame` to be performed fine without exceptions when
the types are non-scala native types. (e.g, `TimestampType`, `DateType` and `DecimalType`).

    
    The problem is, it seems set operations such as `union`, `intersect` and `except` uses
the encoder belonging to the `Dataset` in caller.
    
    So, `Dataset` of the caller holds `ExpressionEncoder[Row]` as it is when the set operations
are performed. However, the return types can be actually widen. So, we should use `ExpressionEncoder[Row]`
constructed from executed plan rather than using existing one. Otherwise, this will generate
some codes wrongly via `StaticInvoke`.
    
    Running the codes below:
    
    ```scala
    val dates = Seq(
      (new Date(0), BigDecimal.valueOf(1), new Timestamp(2)),
      (new Date(3), BigDecimal.valueOf(4), new Timestamp(5))
    ).toDF("date", "timestamp", "decimal")
    
    val widenTypedRows = Seq(
      (new Timestamp(2), 10.5D, "string")
    ).toDF("date", "timestamp", "decimal")
    
    val results = dates.union(widenTypedRows).collect()
    results.foreach(println)
    ```
    
    prints below:
    
    **Before**
    
    ```java
    /* 001 */ public java.lang.Object generate(Object[] references) {
    /* 002 */   return new SpecificSafeProjection(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ class SpecificSafeProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection
{
    /* 006 */
    /* 007 */   private Object[] references;
    /* 008 */   private MutableRow mutableRow;
    /* 009 */   private Object[] values;
    /* 010 */   private org.apache.spark.sql.types.StructType schema;
    /* 011 */
    /* 012 */
    /* 013 */   public SpecificSafeProjection(Object[] references) {
    /* 014 */     this.references = references;
    /* 015 */     mutableRow = (MutableRow) references[references.length - 1];
    /* 016 */
    /* 017 */     this.schema = (org.apache.spark.sql.types.StructType) references[0];
    /* 018 */   }
    /* 019 */
    /* 020 */   public java.lang.Object apply(java.lang.Object _i) {
    /* 021 */     InternalRow i = (InternalRow) _i;
    /* 022 */
    /* 023 */     values = new Object[3];
    /* 024 */
    /* 025 */     boolean isNull2 = i.isNullAt(0);
    /* 026 */     long value2 = isNull2 ? -1L : (i.getLong(0));
    /* 027 */     boolean isNull1 = isNull2;
    /* 028 */     final java.sql.Date value1 = isNull1 ? null : org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(value2);
    /* 029 */     isNull1 = value1 == null;
    /* 030 */     if (isNull1) {
    /* 031 */       values[0] = null;
    /* 032 */     } else {
    /* 033 */       values[0] = value1;
    /* 034 */     }
    /* 035 */
    /* 036 */     boolean isNull4 = i.isNullAt(1);
    /* 037 */     double value4 = isNull4 ? -1.0 : (i.getDouble(1));
    /* 038 */
    /* 039 */     boolean isNull3 = isNull4;
    /* 040 */     java.math.BigDecimal value3 = null;
    /* 041 */     if (!isNull3) {
    /* 042 */
    /* 043 */       Object funcResult = null;
    /* 044 */       funcResult = value4.toJavaBigDecimal();
    /* 045 */       if (funcResult == null) {
    /* 046 */         isNull3 = true;
    /* 047 */       } else {
    /* 048 */         value3 = (java.math.BigDecimal) funcResult;
    /* 049 */       }
    /* 050 */
    /* 051 */     }
    /* 052 */     isNull3 = value3 == null;
    /* 053 */     if (isNull3) {
    /* 054 */       values[1] = null;
    /* 055 */     } else {
    /* 056 */       values[1] = value3;
    /* 057 */     }
    /* 058 */
    /* 059 */     boolean isNull6 = i.isNullAt(2);
    /* 060 */     UTF8String value6 = isNull6 ? null : (i.getUTF8String(2));
    /* 061 */     boolean isNull5 = isNull6;
    /* 062 */     final java.sql.Timestamp value5 = isNull5 ? null : org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaTimestamp(value6);
    /* 063 */     isNull5 = value5 == null;
    /* 064 */     if (isNull5) {
    /* 065 */       values[2] = null;
    /* 066 */     } else {
    /* 067 */       values[2] = value5;
    /* 068 */     }
    /* 069 */
    /* 070 */     final org.apache.spark.sql.Row value = new org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values,
schema);
    /* 071 */     if (false) {
    /* 072 */       mutableRow.setNullAt(0);
    /* 073 */     } else {
    /* 074 */
    /* 075 */       mutableRow.update(0, value);
    /* 076 */     }
    /* 077 */
    /* 078 */     return mutableRow;
    /* 079 */   }
    /* 080 */ }
    ```
    
    
    **After**
    
    ```bash
    [1969-12-31 00:00:00.0,1.0,1969-12-31 16:00:00.002]
    [1969-12-31 00:00:00.0,4.0,1969-12-31 16:00:00.005]
    [1969-12-31 16:00:00.002,10.5,string]
    ```
    
    
    
    
    ## How was this patch tested?
    
    Unit tests in `DataFrameSuite`


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HyukjinKwon/spark SPARK-17123

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/15072.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #15072
    
----
commit 613181081e8edac494205c9f40d2bce5e99b86fc
Author: hyukjinkwon <gurwls223@gmail.com>
Date:   2016-09-13T05:36:00Z

    Use type-widen encoders for DataFrame

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message