flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [4/4] flink git commit: [FLINK-3226] implement getUniqueName method in TranslationContext
Date Thu, 11 Feb 2016 17:00:30 GMT
[FLINK-3226] implement getUniqueName method in TranslationContext

This closes #1600 and #1567


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fff25df5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fff25df5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fff25df5

Branch: refs/heads/tableOnCalcite
Commit: fff25df5ee720f2aec2be5a6309e31968ecdac09
Parents: 509c4b9
Author: vasia <vasia@apache.org>
Authored: Thu Feb 11 14:24:24 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Thu Feb 11 16:48:24 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/plan/RexNodeTranslator.scala     |  2 +-
 .../flink/api/table/plan/TranslationContext.scala    |  4 ++++
 .../plan/nodes/dataset/DataSetGroupReduce.scala      |  2 +-
 .../api/java/table/test/AggregationsITCase.java      |  1 -
 .../api/scala/table/test/AggregationsITCase.scala    | 15 ++++++++++++++-
 5 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fff25df5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index 07e3924..bad111f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -35,7 +35,7 @@ object RexNodeTranslator {
 
     exp match {
       case agg: Aggregation =>
-        val name = "TMP_" + agg.hashCode().toHexString.toUpperCase
+        val name = TranslationContext.getUniqueName
         val aggCall = toAggCall(agg, name, relBuilder)
         val fieldExp = new UnresolvedFieldReference(name)
         (fieldExp, List(aggCall))

http://git-wip-us.apache.org/repos/asf/flink/blob/fff25df5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
index b2b0c2b..51af8d6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
@@ -70,6 +70,10 @@ object TranslationContext {
 
   }
 
+  def getUniqueName: String = {
+    "TMP_" + nameCntr.getAndIncrement()
+  }
+
   def getRelBuilder: RelBuilder = {
     relBuilder
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/fff25df5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
index ad7e0e9..afe09bb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
@@ -67,7 +67,7 @@ class DataSetGroupReduce(
       config: TableConfig,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config)
+    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType)
 
     // get the output types
     val fieldsNames = rowType.getFieldNames

http://git-wip-us.apache.org/repos/asf/flink/blob/fff25df5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
index 8e81893..bcb2308 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -62,7 +62,6 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	@Ignore //DataSetMap needs to be implemented
 	@Test
 	public void testAggregationTypes() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/fff25df5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 64f6757..68cb1ed 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -32,7 +32,6 @@ import scala.collection.JavaConverters._
 @RunWith(classOf[Parameterized])
 class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
{
 
-  @Ignore //DataSetMap needs to be implemented
   @Test
   def testAggregationTypes(): Unit = {
 
@@ -71,6 +70,20 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testProjection(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val t = env.fromElements(
+      (1: Byte, 1: Short),
+      (2: Byte, 2: Short)).toTable
+      .select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
+
+    val expected = "1,3,2,1,3"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
   @Ignore // it seems like the arithmetic expression is added to the field position
   @Test(expected = classOf[NotImplementedError])
   def testAggregationWithArithmetic(): Unit = {


Mime
View raw message