spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-9020][SQL] Support mutable state in code gen expressions
Date Wed, 15 Jul 2015 17:32:33 GMT
Repository: spark
Updated Branches:
  refs/heads/master 6f6902597 -> fa4ec3606


[SPARK-9020][SQL] Support mutable state in code gen expressions

We can keep expressions' mutable states in generated class(like `SpecificProjection`) as member
variables, so that we can read and modify them inside codegened expressions.

Author: Wenchen Fan <cloud0fan@outlook.com>

Closes #7392 from cloud-fan/mutable-state and squashes the following commits:

eb3a221 [Wenchen Fan] fix order
73144d8 [Wenchen Fan] naming improvement
318f41d [Wenchen Fan] address more comments
d43b65d [Wenchen Fan] address comments
fd45c7a [Wenchen Fan] Support mutable state in code gen expressions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa4ec360
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa4ec360
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa4ec360

Branch: refs/heads/master
Commit: fa4ec3606a965238423f977808163983c9d56e0a
Parents: 6f69025
Author: Wenchen Fan <cloud0fan@outlook.com>
Authored: Wed Jul 15 10:31:39 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Wed Jul 15 10:31:39 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/TaskContext.scala    | 15 ++-
 .../expressions/codegen/CodeGenerator.scala     | 17 +++-
 .../codegen/GenerateMutableProjection.scala     |  4 +
 .../expressions/codegen/GenerateOrdering.scala  | 38 ++++++--
 .../expressions/codegen/GeneratePredicate.scala |  4 +
 .../codegen/GenerateProjection.scala            | 99 ++++++++++----------
 .../spark/sql/catalyst/expressions/random.scala | 29 +++++-
 .../expressions/MonotonicallyIncreasingID.scala | 19 +++-
 .../expressions/SparkPartitionID.scala          | 12 ++-
 9 files changed, 171 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fa4ec360/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index d09e17d..2483391 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -32,7 +32,20 @@ object TaskContext {
    */
   def get(): TaskContext = taskContext.get
 
-  private val taskContext: ThreadLocal[TaskContext] = new ThreadLocal[TaskContext]
+  /**
+   * Returns the partition id of currently active TaskContext. It will return 0
+   * if there is no active TaskContext for cases like local execution.
+   */
+  def getPartitionId(): Int = {
+    val tc = taskContext.get()
+    if (tc == null) {
+      0
+    } else {
+      tc.partitionId()
+    }
+  }
+
+  private[this] val taskContext: ThreadLocal[TaskContext] = new ThreadLocal[TaskContext]
 
   // Note: protected[spark] instead of private[spark] to prevent the following two from
   // showing up in JavaDoc.

http://git-wip-us.apache.org/repos/asf/spark/blob/fa4ec360/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 9f6329b..328d635 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -56,6 +56,18 @@ class CodeGenContext {
    */
   val references: mutable.ArrayBuffer[Expression] = new mutable.ArrayBuffer[Expression]()
 
+  /**
+   * Holding expressions' mutable states like `MonotonicallyIncreasingID.count` as a
+   * 3-tuple: java type, variable name, code to init it.
+   * They will be kept as member variables in generated classes like `SpecificProjection`.
+   */
+  val mutableStates: mutable.ArrayBuffer[(String, String, String)] =
+    mutable.ArrayBuffer.empty[(String, String, String)]
+
+  def addMutableState(javaType: String, variableName: String, initialValue: String): Unit
= {
+    mutableStates += ((javaType, variableName, initialValue))
+  }
+
   val stringType: String = classOf[UTF8String].getName
   val decimalType: String = classOf[Decimal].getName
 
@@ -203,7 +215,10 @@ class CodeGenContext {
   def isPrimitiveType(dt: DataType): Boolean = isPrimitiveType(javaType(dt))
 }
 
-
+/**
+ * A wrapper for generated class, defines a `generate` method so that we can pass extra objects
+ * into generated class.
+ */
 abstract class GeneratedClass {
   def generate(expressions: Array[Expression]): Any
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fa4ec360/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index addb802..71e47d4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -46,6 +46,9 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression],
() => Mu
             ${ctx.setColumn("mutableRow", e.dataType, i, evaluationCode.primitive)};
         """
     }.mkString("\n")
+    val mutableStates = ctx.mutableStates.map { case (javaType, variableName, initialValue)
=>
+      s"private $javaType $variableName = $initialValue;"
+    }.mkString("\n      ")
     val code = s"""
       public Object generate($exprType[] expr) {
         return new SpecificProjection(expr);
@@ -55,6 +58,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression],
() => Mu
 
         private $exprType[] expressions = null;
         private $mutableRowType mutableRow = null;
+        $mutableStates
 
         public SpecificProjection($exprType[] expr) {
           expressions = expr;

http://git-wip-us.apache.org/repos/asf/spark/blob/fa4ec360/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index d05dfc1..856ff9f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -46,30 +46,47 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
   protected def create(ordering: Seq[SortOrder]): Ordering[InternalRow] = {
     val ctx = newCodeGenContext()
 
-    val comparisons = ordering.zipWithIndex.map { case (order, i) =>
-      val evalA = order.child.gen(ctx)
-      val evalB = order.child.gen(ctx)
+    val comparisons = ordering.map { order =>
+      val eval = order.child.gen(ctx)
       val asc = order.direction == Ascending
+      val isNullA = ctx.freshName("isNullA")
+      val primitiveA = ctx.freshName("primitiveA")
+      val isNullB = ctx.freshName("isNullB")
+      val primitiveB = ctx.freshName("primitiveB")
       s"""
           i = a;
-          ${evalA.code}
+          boolean $isNullA;
+          ${ctx.javaType(order.child.dataType)} $primitiveA;
+          {
+            ${eval.code}
+            $isNullA = ${eval.isNull};
+            $primitiveA = ${eval.primitive};
+          }
           i = b;
-          ${evalB.code}
-          if (${evalA.isNull} && ${evalB.isNull}) {
+          boolean $isNullB;
+          ${ctx.javaType(order.child.dataType)} $primitiveB;
+          {
+            ${eval.code}
+            $isNullB = ${eval.isNull};
+            $primitiveB = ${eval.primitive};
+          }
+          if ($isNullA && $isNullB) {
             // Nothing
-          } else if (${evalA.isNull}) {
+          } else if ($isNullA) {
             return ${if (order.direction == Ascending) "-1" else "1"};
-          } else if (${evalB.isNull}) {
+          } else if ($isNullB) {
             return ${if (order.direction == Ascending) "1" else "-1"};
           } else {
-            int comp = ${ctx.genComp(order.child.dataType, evalA.primitive, evalB.primitive)};
+            int comp = ${ctx.genComp(order.child.dataType, primitiveA, primitiveB)};
             if (comp != 0) {
               return ${if (asc) "comp" else "-comp"};
             }
           }
       """
     }.mkString("\n")
-
+    val mutableStates = ctx.mutableStates.map { case (javaType, variableName, initialValue)
=>
+      s"private $javaType $variableName = $initialValue;"
+    }.mkString("\n      ")
     val code = s"""
       public SpecificOrdering generate($exprType[] expr) {
         return new SpecificOrdering(expr);
@@ -78,6 +95,7 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
       class SpecificOrdering extends ${classOf[BaseOrdering].getName} {
 
         private $exprType[] expressions = null;
+        $mutableStates
 
         public SpecificOrdering($exprType[] expr) {
           expressions = expr;

http://git-wip-us.apache.org/repos/asf/spark/blob/fa4ec360/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
index 274a42c..9e5a745 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
@@ -40,6 +40,9 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow)
=> Bool
   protected def create(predicate: Expression): ((InternalRow) => Boolean) = {
     val ctx = newCodeGenContext()
     val eval = predicate.gen(ctx)
+    val mutableStates = ctx.mutableStates.map { case (javaType, variableName, initialValue)
=>
+      s"private $javaType $variableName = $initialValue;"
+    }.mkString("\n      ")
     val code = s"""
       public SpecificPredicate generate($exprType[] expr) {
         return new SpecificPredicate(expr);
@@ -47,6 +50,7 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow)
=> Bool
 
       class SpecificPredicate extends ${classOf[Predicate].getName} {
         private final $exprType[] expressions;
+        $mutableStates
         public SpecificPredicate($exprType[] expr) {
           expressions = expr;
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/fa4ec360/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
index 3c7ee9c..3e5ca30 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
@@ -151,6 +151,10 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection]
{
         s"""if (!nullBits[$i]) arr[$i] = c$i;"""
     }.mkString("\n      ")
 
+    val mutableStates = ctx.mutableStates.map { case (javaType, variableName, initialValue)
=>
+      s"private $javaType $variableName = $initialValue;"
+    }.mkString("\n      ")
+
     val code = s"""
     public SpecificProjection generate($exprType[] expr) {
       return new SpecificProjection(expr);
@@ -158,6 +162,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection]
{
 
     class SpecificProjection extends ${classOf[BaseProject].getName} {
       private $exprType[] expressions = null;
+      $mutableStates
 
       public SpecificProjection($exprType[] expr) {
         expressions = expr;
@@ -165,65 +170,65 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection]
{
 
       @Override
       public Object apply(Object r) {
-        return new SpecificRow(expressions, (InternalRow) r);
+        return new SpecificRow((InternalRow) r);
       }
-    }
 
-    final class SpecificRow extends ${classOf[MutableRow].getName} {
+      final class SpecificRow extends ${classOf[MutableRow].getName} {
 
-      $columns
+        $columns
 
-      public SpecificRow($exprType[] expressions, InternalRow i) {
-        $initColumns
-      }
+        public SpecificRow(InternalRow i) {
+          $initColumns
+        }
 
-      public int length() { return ${expressions.length};}
-      protected boolean[] nullBits = new boolean[${expressions.length}];
-      public void setNullAt(int i) { nullBits[i] = true; }
-      public boolean isNullAt(int i) { return nullBits[i]; }
+        public int length() { return ${expressions.length};}
+        protected boolean[] nullBits = new boolean[${expressions.length}];
+        public void setNullAt(int i) { nullBits[i] = true; }
+        public boolean isNullAt(int i) { return nullBits[i]; }
 
-      public Object get(int i) {
-        if (isNullAt(i)) return null;
-        switch (i) {
-        $getCases
+        public Object get(int i) {
+          if (isNullAt(i)) return null;
+          switch (i) {
+          $getCases
+          }
+          return null;
         }
-        return null;
-      }
-      public void update(int i, Object value) {
-        if (value == null) {
-          setNullAt(i);
-          return;
+        public void update(int i, Object value) {
+          if (value == null) {
+            setNullAt(i);
+            return;
+          }
+          nullBits[i] = false;
+          switch (i) {
+          $updateCases
+          }
         }
-        nullBits[i] = false;
-        switch (i) {
-        $updateCases
+        $specificAccessorFunctions
+        $specificMutatorFunctions
+
+        @Override
+        public int hashCode() {
+          int result = 37;
+          $hashUpdates
+          return result;
         }
-      }
-      $specificAccessorFunctions
-      $specificMutatorFunctions
-
-      @Override
-      public int hashCode() {
-        int result = 37;
-        $hashUpdates
-        return result;
-      }
 
-      @Override
-      public boolean equals(Object other) {
-        if (other instanceof SpecificRow) {
-          SpecificRow row = (SpecificRow) other;
-          $columnChecks
-          return true;
+        @Override
+        public boolean equals(Object other) {
+          if (other instanceof SpecificRow) {
+            SpecificRow row = (SpecificRow) other;
+            $columnChecks
+            return true;
+          }
+          return super.equals(other);
         }
-        return super.equals(other);
-      }
 
-      @Override
-      public InternalRow copy() {
-        Object[] arr = new Object[${expressions.length}];
-        ${copyColumns}
-        return new ${classOf[GenericInternalRow].getName}(arr);
+        @Override
+        public InternalRow copy() {
+          Object[] arr = new Object[${expressions.length}];
+          ${copyColumns}
+          return new ${classOf[GenericInternalRow].getName}(arr);
+        }
       }
     }
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/fa4ec360/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
index 6cdc300..e10ba55 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
 import org.apache.spark.sql.types.{DataType, DoubleType}
 import org.apache.spark.util.Utils
 import org.apache.spark.util.random.XORShiftRandom
@@ -38,11 +39,7 @@ abstract class RDG(seed: Long) extends LeafExpression with Serializable
{
    * Record ID within each partition. By being transient, the Random Number Generator is
    * reset every time we serialize and deserialize it.
    */
-  @transient protected lazy val partitionId = TaskContext.get() match {
-    case null => 0
-    case _ => TaskContext.get().partitionId()
-  }
-  @transient protected lazy val rng = new XORShiftRandom(seed + partitionId)
+  @transient protected lazy val rng = new XORShiftRandom(seed + TaskContext.getPartitionId)
 
   override def deterministic: Boolean = false
 
@@ -61,6 +58,17 @@ case class Rand(seed: Long) extends RDG(seed) {
     case IntegerLiteral(s) => s
     case _ => throw new AnalysisException("Input argument to rand must be an integer literal.")
   })
+
+  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+    val rngTerm = ctx.freshName("rng")
+    val className = classOf[XORShiftRandom].getCanonicalName
+    ctx.addMutableState(className, rngTerm,
+      s"new $className($seed + org.apache.spark.TaskContext.getPartitionId())")
+    ev.isNull = "false"
+    s"""
+      final ${ctx.javaType(dataType)} ${ev.primitive} = $rngTerm.nextDouble();
+    """
+  }
 }
 
 /** Generate a random column with i.i.d. gaussian random distribution. */
@@ -73,4 +81,15 @@ case class Randn(seed: Long) extends RDG(seed) {
     case IntegerLiteral(s) => s
     case _ => throw new AnalysisException("Input argument to rand must be an integer literal.")
   })
+
+  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+    val rngTerm = ctx.freshName("rng")
+    val className = classOf[XORShiftRandom].getCanonicalName
+    ctx.addMutableState(className, rngTerm,
+      s"new $className($seed + org.apache.spark.TaskContext.getPartitionId())")
+    ev.isNull = "false"
+    s"""
+      final ${ctx.javaType(dataType)} ${ev.primitive} = $rngTerm.nextGaussian();
+    """
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fa4ec360/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
index 437d143..69a3775 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.expressions
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.LeafExpression
+import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
 import org.apache.spark.sql.types.{LongType, DataType}
 
 /**
@@ -40,6 +41,8 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression
{
    */
   @transient private[this] var count: Long = 0L
 
+  @transient private lazy val partitionMask = TaskContext.getPartitionId.toLong <<
33
+
   override def nullable: Boolean = false
 
   override def dataType: DataType = LongType
@@ -47,6 +50,20 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression
{
   override def eval(input: InternalRow): Long = {
     val currentCount = count
     count += 1
-    (TaskContext.get().partitionId().toLong << 33) + currentCount
+    partitionMask + currentCount
+  }
+
+  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+    val countTerm = ctx.freshName("count")
+    val partitionMaskTerm = ctx.freshName("partitionMask")
+    ctx.addMutableState(ctx.JAVA_LONG, countTerm, "0L")
+    ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm,
+      "((long) org.apache.spark.TaskContext.getPartitionId()) << 33")
+
+    ev.isNull = "false"
+    s"""
+      final ${ctx.javaType(dataType)} ${ev.primitive} = $partitionMaskTerm + $countTerm;
+      $countTerm++;
+    """
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fa4ec360/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
index 822d3d8..5f1b514 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.expressions
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.LeafExpression
+import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
 import org.apache.spark.sql.types.{IntegerType, DataType}
 
 
@@ -32,5 +33,14 @@ private[sql] case object SparkPartitionID extends LeafExpression {
 
   override def dataType: DataType = IntegerType
 
-  override def eval(input: InternalRow): Int = TaskContext.get().partitionId()
+  @transient private lazy val partitionId = TaskContext.getPartitionId
+
+  override def eval(input: InternalRow): Int = partitionId
+
+  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+    val idTerm = ctx.freshName("partitionId")
+    ctx.addMutableState(ctx.JAVA_INT, idTerm, "org.apache.spark.TaskContext.getPartitionId()")
+    ev.isNull = "false"
+    s"final ${ctx.javaType(dataType)} ${ev.primitive} = $idTerm;"
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message