beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taki...@apache.org
Subject [1/2] beam git commit: use static table name PCOLLECTION in BeamSql.simpleQuery.
Date Tue, 27 Jun 2017 00:38:13 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 8e9b930bc -> bd99528af


use static table name PCOLLECTION in BeamSql.simpleQuery.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f612049
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f612049
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f612049

Branch: refs/heads/DSL_SQL
Commit: 1f612049b83a67070d13aae790d61e0f71d79ca7
Parents: 8e9b930
Author: mingmxu <mingmxu@ebay.com>
Authored: Thu Jun 22 16:50:58 2017 -0700
Committer: mingmxu <mingmxu@ebay.com>
Committed: Thu Jun 22 16:50:58 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/dsls/sql/BeamSql.java  | 27 +++++++++++++++-----
 .../dsls/sql/BeamSqlDslAggregationTest.java     |  6 ++---
 .../beam/dsls/sql/BeamSqlDslFilterTest.java     |  2 +-
 .../beam/dsls/sql/BeamSqlDslProjectTest.java    |  2 +-
 4 files changed, 25 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1f612049/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
index e68188b..5f90380 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -50,9 +50,8 @@ PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/path
     .apply(...);
 
 //run a simple query, and register the output as a table in BeamSql;
-String sql1 = "select MY_FUNC(c1), c2 from TABLE_A";
-PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1)
-        .withUdf("MY_FUNC", myFunc));
+String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION";
+PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1));
 
 //run a JOIN with one table from TextIO, and one table from another query
 PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of(
@@ -91,6 +90,8 @@ public class BeamSql {
    *
    * <p>This is a simplified form of {@link #query(String)} where the query must reference
    * a single input table.
+   *
+   * <p>Make sure to query it from a static table name <em>PCOLLECTION</em>.
    */
   public static PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>>
   simpleQuery(String sqlQuery) throws Exception {
@@ -151,15 +152,20 @@ public class BeamSql {
    */
   private static class SimpleQueryTransform
       extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>>
{
+    private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
     BeamSqlEnv sqlEnv = new BeamSqlEnv();
     private String sqlQuery;
 
     public SimpleQueryTransform(String sqlQuery) {
       this.sqlQuery = sqlQuery;
+      validateQuery();
     }
 
-    @Override
-    public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
+    // public SimpleQueryTransform withUdf(String udfName){
+    // throw new UnsupportedOperationException("Pending for UDF support");
+    // }
+
+    private void validateQuery() {
       SqlNode sqlNode;
       try {
         sqlNode = sqlEnv.planner.parseQuery(sqlQuery);
@@ -171,12 +177,19 @@ public class BeamSql {
       if (sqlNode instanceof SqlSelect) {
         SqlSelect select = (SqlSelect) sqlNode;
         String tableName = select.getFrom().toString();
-        return PCollectionTuple.of(new TupleTag<BeamSqlRow>(tableName), input)
-            .apply(new QueryTransform(sqlQuery, sqlEnv));
+        if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) {
+          throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME);
+        }
       } else {
         throw new UnsupportedOperationException(
             "Sql operation: " + sqlNode.toString() + " is not supported!");
       }
     }
+
+    @Override
+    public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
+      return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME),
input)
+          .apply(new QueryTransform(sqlQuery, sqlEnv));
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1f612049/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
index f7349c6..b0509ae 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
@@ -37,7 +37,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
    */
   @Test
   public void testAggregationWithoutWindow() throws Exception {
-    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A GROUP BY f_int2";
+    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
 
     PCollection<BeamSqlRow> result =
         inputA1.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
@@ -125,7 +125,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
    */
   @Test
   public void testDistinct() throws Exception {
-    String sql = "SELECT distinct f_int, f_long FROM TABLE_A ";
+    String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION ";
 
     PCollection<BeamSqlRow> result =
         inputA1.apply("testDistinct", BeamSql.simpleQuery(sql));
@@ -190,7 +190,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
    */
   @Test
   public void testHopWindow() throws Exception {
-    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION "
         + "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
     PCollection<BeamSqlRow> result =
         inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql));

http://git-wip-us.apache.org/repos/asf/beam/blob/1f612049/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
index b68e526..254b96d 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
@@ -33,7 +33,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
    */
   @Test
   public void testSingleFilter() throws Exception {
-    String sql = "SELECT * FROM TABLE_A WHERE f_int = 1";
+    String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1";
 
     PCollection<BeamSqlRow> result =
         inputA1.apply("testSingleFilter", BeamSql.simpleQuery(sql));

http://git-wip-us.apache.org/repos/asf/beam/blob/1f612049/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
index 2998682..1faa4d0 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
@@ -36,7 +36,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
    */
   @Test
   public void testSelectAll() throws Exception {
-    String sql = "SELECT * FROM TABLE_A";
+    String sql = "SELECT * FROM PCOLLECTION";
 
     PCollection<BeamSqlRow> result =
         inputA2.apply("testSelectAll", BeamSql.simpleQuery(sql));


Mime
View raw message