flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [2/2] flink git commit: [FLINK-4180] [FLINK-4181] [table] add Batch SQL and Stream SQL and Stream Table API examples
Date Tue, 02 Aug 2016 14:25:46 GMT
[FLINK-4180] [FLINK-4181] [table] add Batch SQL and Stream SQL and Stream Table API examples

This closes #2274.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/123c637e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/123c637e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/123c637e

Branch: refs/heads/master
Commit: 123c637e804bfdd6569051cf705ec73b5cb95352
Parents: 45df1b2
Author: Jark Wu <wuchong.wc@alibaba-inc.com>
Authored: Thu Jul 21 00:31:01 2016 +0800
Committer: twalthr <twalthr@apache.org>
Committed: Tue Aug 2 16:24:59 2016 +0200

----------------------------------------------------------------------
 .../flink/examples/java/JavaSQLExample.java     | 72 ++++++++++++++++++++
 .../flink/examples/scala/StreamSQLExample.scala | 62 +++++++++++++++++
 .../examples/scala/StreamTableExample.scala     | 58 ++++++++++++++++
 .../flink/examples/scala/WordCountSQL.scala     | 43 ++++++++++++
 4 files changed, 235 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
new file mode 100644
index 0000000..bbac94a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.java;
+
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.table.TableEnvironment;
+
+/**
+ * Simple example that shows how the Batch SQL used in Java.
+ */
+public class JavaSQLExample {
+
+	public static class WC {
+		public String word;
+		public long frequency;
+
+		// Public constructor to make it a Flink POJO
+		public WC() {
+
+		}
+
+		public WC(String word, long frequency) {
+			this.word = word;
+			this.frequency = frequency;
+		}
+
+		@Override
+		public String toString() {
+			return "WC " + word + " " + frequency;
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<WC> input = env.fromElements(
+			new WC("Hello", 1),
+			new WC("Ciao", 1),
+			new WC("Hello", 1));
+
+		// register the DataSet as table "WordCount"
+		tableEnv.registerDataSet("WordCount", input, "word, frequency");
+		// run a SQL query on the Table and retrieve the result as a new Table
+		Table table = tableEnv.sql(
+			"SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
+
+		DataSet<WC> result = tableEnv.toDataSet(table, WC.class);
+
+		result.print();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
new file mode 100644
index 0000000..8eed77d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+
+/**
+  * Simple example for demonstrating the use of SQL on Stream Table.
+  */
+object StreamSQLExample {
+
+  case class Order(user: Long, product: String, amount: Int)
+
+  def main(args: Array[String]): Unit = {
+
+    // set up execution environment
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val orderA: DataStream[Order] = env.fromCollection(Seq(
+      Order(1L, "beer", 3),
+      Order(1L, "diaper", 4),
+      Order(3L, "rubber", 2)))
+
+    val orderB: DataStream[Order] = env.fromCollection(Seq(
+      Order(2L, "pen", 3),
+      Order(2L, "rubber", 3),
+      Order(4L, "beer", 1)))
+
+    // register the DataStream under the name "OrderA" and "OrderB"
+    tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
+    tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
+
+    // Union two tables
+    val result = tEnv.sql(
+      "SELECT STREAM * FROM OrderA WHERE amount > 2 UNION ALL " +
+        "SELECT STREAM * FROM OrderB WHERE amount < 2")
+
+    result.toDataStream[Order].print()
+
+    env.execute()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
new file mode 100644
index 0000000..9081f50
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+
+/**
+  * Simple example for demonstrating the use of Table API on Stream Table.
+  */
+object StreamTableExample {
+
+  case class Order(user: Long, product: String, amount: Int)
+
+  def main(args: Array[String]): Unit = {
+
+    // set up execution environment
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val orderA = env.fromCollection(Seq(
+      Order(1L, "beer", 3),
+      Order(1L, "diaper", 4),
+      Order(3L, "rubber", 2))).toTable(tEnv)
+
+    val orderB = env.fromCollection(Seq(
+      Order(2L, "pen", 3),
+      Order(2L, "rubber", 3),
+      Order(4L, "beer", 1))).toTable(tEnv)
+
+    val result: DataStream[Order] = orderA.unionAll(orderB)
+      .select('user, 'product, 'amount)
+      .where('amount > 2)
+      .toDataStream[Order]
+
+    result.print()
+
+    env.execute()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
new file mode 100644
index 0000000..41efffc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+
+/**
+  * Simple example that shows how the Batch SQL used in Scala.
+  */
+object WordCountSQL {
+  case class WC(word: String, count: Int)
+
+  def main(args: Array[String]): Unit = {
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+    tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
+
+    val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
+
+    table.toDataSet[WC].print()
+  }
+}


Mime
View raw message