Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D84EB17EF0 for ; Tue, 6 Oct 2015 12:30:27 +0000 (UTC) Received: (qmail 12428 invoked by uid 500); 6 Oct 2015 12:30:27 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 12388 invoked by uid 500); 6 Oct 2015 12:30:27 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 12379 invoked by uid 99); 6 Oct 2015 12:30:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Oct 2015 12:30:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 887AFDFF95; Tue, 6 Oct 2015 12:30:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: twalthr@apache.org To: commits@flink.apache.org Message-Id: <19e56352edac49009f6227c29d53cbac@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-2642] [table] Scala Table API crashes when executing word count example Date: Tue, 6 Oct 2015 12:30:27 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master 4c5d43b6f -> 4938ff09f [FLINK-2642] [table] Scala Table API crashes when executing word count example This closes #1209. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4938ff09 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4938ff09 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4938ff09 Branch: refs/heads/master Commit: 4938ff09f692d7b8b1c3af16125d2216eb5c623c Parents: 4c5d43b Author: twalthr Authored: Fri Oct 2 11:12:01 2015 +0200 Committer: twalthr Committed: Tue Oct 6 14:26:27 2015 +0200 ---------------------------------------------------------------------- .../flink/api/table/plan/PlanTranslator.scala | 3 +- .../flink/examples/scala/WordCountTable.scala | 45 ++++++++++++++++++++ .../scala/table/test/TypeExceptionTest.scala | 42 ++++++++++++++++++ 3 files changed, 89 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4938ff09/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala index 354c7d4..ba8aba4 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala @@ -110,7 +110,8 @@ abstract class PlanTranslator { } val clazz = repr.getType().getTypeClass - if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) + || clazz.getCanonicalName() == null) { throw new ExpressionException("Cannot create Table from DataSet or DataStream of type " + clazz.getName + ". Only top-level classes or static members classes " + " are supported.") http://git-wip-us.apache.org/repos/asf/flink/blob/4938ff09/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala new file mode 100644 index 0000000..cac9590 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ + +/** + * Simple example for demonstrating the use of the Table API for a Word Count. + */ +object WordCountTable { + + case class WC(word: String, count: Int) + + def main(args: Array[String]): Unit = { + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) + val expr = input.toTable + val result = expr + .groupBy('word) + .select('word, 'count.sum as 'count) + .toDataSet[WC] + + result.print() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4938ff09/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala new file mode 100644 index 0000000..acb7ded --- /dev/null +++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala @@ -0,0 +1,42 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.ExpressionException +import org.junit.Test + +class TypeExceptionTest { + + @Test(expected = classOf[ExpressionException]) + def testInnerCaseClassException(): Unit = { + case class WC(word: String, count: Int) + + val env = ExecutionEnvironment.getExecutionEnvironment + val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) + val expr = input.toTable // this should fail + val result = expr + .groupBy('word) + .select('word, 'count.sum as 'count) + .toDataSet[WC] + + result.print() + } +}