beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: register table for both BeamSql.simpleQuery and BeamSql.query
Date Wed, 14 Jun 2017 00:08:50 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL d52df7471 -> 1e080e2ba


register table for both BeamSql.simpleQuery and BeamSql.query


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ce59eec7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ce59eec7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ce59eec7

Branch: refs/heads/DSL_SQL
Commit: ce59eec7f88dfcbbdb16a0db420e0ce541b47468
Parents: d52df74
Author: mingmxu <mingmxu@ebay.com>
Authored: Sat Jun 10 13:46:17 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Tue Jun 13 17:06:27 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/dsls/sql/BeamSql.java  | 12 +++++++---
 .../beam/dsls/sql/example/BeamSqlExample.java   | 24 ++++++++++++++++----
 2 files changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ce59eec7/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
index 809fed3..ae281ac 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -110,6 +110,15 @@ public class BeamSql {
 
     @Override
     public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
+      //register tables
+      for (TupleTag<?> sourceTag : input.getAll().keySet()) {
+        PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
+        BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();
+
+        BeamSqlEnv.registerTable(sourceTag.getId(),
+            new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema().toRelDataType()));
+      }
+
       BeamRelNode beamRelNode = null;
       try {
         beamRelNode = BeamSqlEnv.planner.convertToBeamRel(sqlQuery);
@@ -149,13 +158,10 @@ public class BeamSql {
       } catch (SqlParseException e) {
         throw new IllegalStateException(e);
       }
-      BeamSqlRowCoder inputCoder = (BeamSqlRowCoder) input.getCoder();
 
       if (sqlNode instanceof SqlSelect) {
         SqlSelect select = (SqlSelect) sqlNode;
         String tableName = select.getFrom().toString();
-        BeamSqlEnv.registerTable(tableName,
-            new BeamPCollectionTable(input, inputCoder.getTableSchema().toRelDataType()));
         return PCollectionTuple.of(new TupleTag<BeamSqlRow>(tableName), input)
             .apply(BeamSql.query(sqlQuery));
       } else {

http://git-wip-us.apache.org/repos/asf/beam/blob/ce59eec7/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 4d7328e..36e1aa9 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -29,6 +29,8 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,16 +60,30 @@ public class BeamSqlExample {
     PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
         .withCoder(new BeamSqlRowCoder(type)));
 
-    //run a simple SQL query over input PCollection;
-    String sql = "select c2, c3 from TABLE_A where c1=1";
-    PCollection<BeamSqlRow> outputStream = inputTable.apply(BeamSql.simpleQuery(sql));
+    //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
+    PCollection<BeamSqlRow> outputStream = inputTable.apply(
+        BeamSql.simpleQuery("select c2, c3 from TABLE_A where c1=1"));
 
     //log out the output record;
     outputStream.apply("log_result",
         MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>()
{
+      public Void apply(BeamSqlRow input) {
+        System.out.println("TABLE_A: " + input);
+        return null;
+      }
+    }));
+
+    //Case 2. run the query with BeamSql.query
+    PCollection<BeamSqlRow> outputStream2 =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_B"), inputTable)
+        .apply(BeamSql.query("select c2, c3 from TABLE_B where c1=1"));
+
+    //log out the output record;
+    outputStream2.apply("log_result",
+        MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>()
{
       @Override
       public Void apply(BeamSqlRow input) {
-        LOG.info(input.valueInString());
+        System.out.println("TABLE_B: " + input);
         return null;
       }
     }));


Mime
View raw message