Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8B19C18754 for ; Tue, 16 Feb 2016 12:57:03 +0000 (UTC) Received: (qmail 55152 invoked by uid 500); 16 Feb 2016 12:57:03 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 55115 invoked by uid 500); 16 Feb 2016 12:57:03 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 55045 invoked by uid 99); 16 Feb 2016 12:57:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Feb 2016 12:57:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1EFB1E057C; Tue, 16 Feb 2016 12:57:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Tue, 16 Feb 2016 12:57:03 -0000 Message-Id: <80def2f070f642019c77135458ae488a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-3226] Casting support for arithmetic operators Repository: flink Updated Branches: refs/heads/tableOnCalcite 7233c241d -> 1c53c873a [FLINK-3226] Casting support for arithmetic operators Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d765d08 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d765d08 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d765d08 Branch: refs/heads/tableOnCalcite Commit: 9d765d0833cabfa6edd4097668b6ea7bd182ad76 Parents: 7233c24 Author: twalthr Authored: Sat Feb 13 12:38:12 2016 +0100 Committer: Fabian Hueske Committed: Tue Feb 16 10:02:24 2016 +0100 ---------------------------------------------------------------------- .../api/table/codegen/OperatorCodeGen.scala | 38 ++++++++++++-- .../api/java/table/test/CastingITCase.java | 55 ++++++++++---------- .../api/scala/table/test/CastingITCase.scala | 47 +++++++++-------- 3 files changed, 87 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9d765d08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala index 8402569..0f8083e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala @@ -18,20 +18,52 @@ package org.apache.flink.api.table.codegen import org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation} import org.apache.flink.api.table.codegen.CodeGenUtils._ object OperatorCodeGen { - def generateArithmeticOperator( + def generateArithmeticOperator( operator: String, nullCheck: Boolean, resultType: TypeInformation[_], left: GeneratedExpression, right: GeneratedExpression) : GeneratedExpression = { - generateOperatorIfNotNull(nullCheck, resultType, left, right) { + // String arithmetic // TODO rework + if (isString(left)) { + generateOperatorIfNotNull(nullCheck, resultType, left, right) { (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm" + } + } + // Numeric arithmetic + else if (isNumeric(left) && isNumeric(right)) { + val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]] + val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]] + val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) + + generateOperatorIfNotNull(nullCheck, resultType, left, right) { + (leftTerm, rightTerm) => + // no casting required + if (leftType == resultType && rightType == resultType) { + s"$leftTerm $operator $rightTerm" + } + // left needs casting + else if (leftType != resultType && rightType == resultType) { + s"(($resultTypeTerm) $leftTerm) $operator $rightTerm" + } + // right needs casting + else if (leftType == resultType && rightType != resultType) { + s"$leftTerm $operator (($resultTypeTerm) $rightTerm)" + } + // both sides need casting + else { + s"(($resultTypeTerm) $leftTerm) $operator (($resultTypeTerm) $rightTerm)" + } + } + } + else { + throw new CodeGenException("Unsupported arithmetic operation.") } } http://git-wip-us.apache.org/repos/asf/flink/blob/9d765d08/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java index 957c093..e5b5f58 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java @@ -18,77 +18,76 @@ package org.apache.flink.api.java.table.test; +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.api.table.Table; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.tuple.Tuple8; import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; import org.apache.flink.api.table.codegen.CodeGenException; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Ignore; +import org.apache.flink.api.table.test.TableProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import scala.NotImplementedError; - -import java.util.List; - @RunWith(Parameterized.class) -public class CastingITCase extends MultipleProgramsTestBase { +public class CastingITCase extends TableProgramsTestBase { - public CastingITCase(TestExecutionMode mode){ - super(mode); + public CastingITCase(TestExecutionMode mode, TableConfigMode configMode){ + super(mode, configMode); } - @Ignore - @Test(expected = NotImplementedError.class) + @Test public void testNumericAutocastInArithmetic() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); - DataSource> input = - env.fromElements(new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello")); + DataSource> input = + env.fromElements(new Tuple8<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)); Table table = tableEnv.fromDataSet(input); Table result = table.select("f0 + 1, f1 +" + - " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1"); + " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1, f6 + 1.0d, f7 + f0"); DataSet ds = tableEnv.toDataSet(result, Row.class); List results = ds.collect(); - String expected = "2,2,2,2.0,2.0,2.0"; + String expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1"; compareResultAsText(results, expected); } @Test public void testNumericAutocastInComparison() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); - DataSource> input = + DataSource> input = env.fromElements( - new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), - new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Hello")); + new Tuple6<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d), + new Tuple6<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d)); Table table = - tableEnv.fromDataSet(input, "a,b,c,d,e,f,g"); + tableEnv.fromDataSet(input, "a,b,c,d,e,f"); Table result = table .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1"); DataSet ds = tableEnv.toDataSet(result, Row.class); List results = ds.collect(); - String expected = "2,2,2,2,2.0,2.0,Hello"; + String expected = "2,2,2,2,2.0,2.0"; compareResultAsText(results, expected); } + // TODO support advanced String operations + @Test(expected = CodeGenException.class) public void testCastFromString() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/9d765d08/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala index d6a853d..6121cb6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala @@ -19,45 +19,33 @@ package org.apache.flink.api.scala.table.test import java.util.Date -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized + import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.Row -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.api.table.codegen.CodeGenException import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + import scala.collection.JavaConverters._ -import org.apache.flink.api.table.codegen.CodeGenException @RunWith(classOf[Parameterized]) class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - @Ignore // String autocasting not yet supported @Test - def testAutoCastToString(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new Date(0))).toTable - .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d", '_7 + "Date") - - val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Ignore // gives different types of exceptions for cluster and collection modes - @Test(expected = classOf[NotImplementedError]) def testNumericAutoCastInArithmetic(): Unit = { // don't test everything, just some common cast directions val env = ExecutionEnvironment.getExecutionEnvironment - val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable - .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1) + val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable + .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1) - val expected = "2,2,2,2.0,2.0,2.0" + val expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1" val results = t.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -78,6 +66,21 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo TestBaseUtils.compareResultAsText(results.asJava, expected) } + // TODO support advanced String operations + + @Ignore + @Test + def testAutoCastToString(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new Date(0))).toTable + .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d", '_7 + "Date") + + val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + @Test(expected = classOf[CodeGenException]) def testCastFromString: Unit = {