beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taki...@apache.org
Subject [1/2] beam git commit: support TUMBLE/HOP/SESSION _START function
Date Wed, 12 Jul 2017 16:38:03 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL aa265e62a -> e38cf43df


support TUMBLE/HOP/SESSION _START function


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4391e1dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4391e1dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4391e1dd

Branch: refs/heads/DSL_SQL
Commit: 4391e1dd895c6fb0aa2d3600792415ad2d041c5b
Parents: aa265e6
Author: mingmxu <mingmxu@ebay.com>
Authored: Sun Jul 9 00:52:23 2017 -0700
Committer: Tyler Akidau <takidau@apache.org>
Committed: Wed Jul 12 09:35:50 2017 -0700

----------------------------------------------------------------------
 .../interpreter/operator/BeamSqlPrimitive.java  |  4 +++
 .../beam/dsls/sql/rel/BeamAggregationRel.java   |  2 +-
 .../transform/BeamAggregationTransforms.java    |  8 ++++-
 .../dsls/sql/BeamSqlDslAggregationTest.java     | 35 +++++++++++++++-----
 .../transform/BeamAggregationTransformTest.java |  2 +-
 5 files changed, 39 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4391e1dd/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index 92d1263..c5c80b9 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -97,6 +97,10 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression {
     return (Date) getValue();
   }
 
+  public BigDecimal getDecimal() {
+    return (BigDecimal) getValue();
+  }
+
   @Override
   public boolean accept() {
     if (value == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4391e1dd/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index 9bb2902..5389ec7 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -110,7 +110,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode
{
 
     PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
         ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
-            CalciteUtils.toBeamRecordType(getRowType()), getAggCallList())));
+            CalciteUtils.toBeamRecordType(getRowType()), getAggCallList(), windowFieldIdx)));
     mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
 
     return mergedStream;

http://git-wip-us.apache.org/repos/asf/beam/blob/4391e1dd/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
index 9c0b4a3..41e5837 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
@@ -59,13 +59,16 @@ public class BeamAggregationTransforms implements Serializable{
   public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>,
BeamSqlRow> {
     private BeamSqlRecordType outRecordType;
     private List<String> aggFieldNames;
+    private int windowFieldIdx;
 
-    public MergeAggregationRecord(BeamSqlRecordType outRecordType, List<AggregateCall>
aggList) {
+    public MergeAggregationRecord(BeamSqlRecordType outRecordType, List<AggregateCall>
aggList
+        , int windowFieldIdx) {
       this.outRecordType = outRecordType;
       this.aggFieldNames = new ArrayList<>();
       for (AggregateCall ac : aggList) {
         aggFieldNames.add(ac.getName());
       }
+      this.windowFieldIdx = windowFieldIdx;
     }
 
     @ProcessElement
@@ -80,6 +83,9 @@ public class BeamAggregationTransforms implements Serializable{
       for (int idx = 0; idx < aggFieldNames.size(); ++idx) {
         outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx));
       }
+      if (windowFieldIdx != -1) {
+        outRecord.addField(windowFieldIdx, outRecord.getWindowStart().toDate());
+      }
 
       c.output(outRecord);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/4391e1dd/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
index f92c803..ac0b1cb 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
@@ -159,24 +159,29 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
    */
   @Test
   public void testTumbleWindow() throws Exception {
-    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+    String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+        + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`"
+        + " FROM TABLE_A "
         + "GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
     PCollection<BeamSqlRow> result =
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
         .apply("testTumbleWindow", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
-        Arrays.asList(Types.INTEGER, Types.BIGINT));
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+        Arrays.asList("f_int2", "size", "window_start"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
     BeamSqlRow record1 = new BeamSqlRow(resultType);
     record1.addField("f_int2", 0);
     record1.addField("size", 3L);
+    record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
     record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
     record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
 
     BeamSqlRow record2 = new BeamSqlRow(resultType);
     record2.addField("f_int2", 0);
     record2.addField("size", 1L);
+    record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
     record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
     record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
 
@@ -190,35 +195,42 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
    */
   @Test
   public void testHopWindow() throws Exception {
-    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION "
+    String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+        + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`"
+        + " FROM PCOLLECTION "
         + "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
     PCollection<BeamSqlRow> result =
         inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
-        Arrays.asList(Types.INTEGER, Types.BIGINT));
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+        Arrays.asList("f_int2", "size", "window_start"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
     BeamSqlRow record1 = new BeamSqlRow(resultType);
     record1.addField("f_int2", 0);
     record1.addField("size", 3L);
+    record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00"));
     record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime()));
     record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
 
     BeamSqlRow record2 = new BeamSqlRow(resultType);
     record2.addField("f_int2", 0);
     record2.addField("size", 3L);
+    record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
     record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
     record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
 
     BeamSqlRow record3 = new BeamSqlRow(resultType);
     record3.addField("f_int2", 0);
     record3.addField("size", 1L);
+    record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00"));
     record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
     record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime()));
 
     BeamSqlRow record4 = new BeamSqlRow(resultType);
     record4.addField("f_int2", 0);
     record4.addField("size", 1L);
+    record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
     record4.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
     record4.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
 
@@ -232,24 +244,29 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
    */
   @Test
   public void testSessionWindow() throws Exception {
-    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+    String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+        + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`"
+        + " FROM TABLE_A "
         + "GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
     PCollection<BeamSqlRow> result =
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
         .apply("testSessionWindow", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
-        Arrays.asList(Types.INTEGER, Types.BIGINT));
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+        Arrays.asList("f_int2", "size", "window_start"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
     BeamSqlRow record1 = new BeamSqlRow(resultType);
     record1.addField("f_int2", 0);
     record1.addField("size", 3L);
+    record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03"));
     record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime()));
     record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime()));
 
     BeamSqlRow record2 = new BeamSqlRow(resultType);
     record2.addField("f_int2", 0);
     record2.addField("size", 1L);
+    record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03"));
     record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:04:03").getTime()));
     record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:09:03").getTime()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4391e1dd/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
index 2b01254..a0fed22 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
@@ -122,7 +122,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
 
     //4. flat KV to a single record
     PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord",
-        ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls)));
+        ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls,
-1)));
     mergedStream.setCoder(outRecordCoder);
 
     //assert function BeamAggregationTransform.AggregationGroupByKeyFn


Mime
View raw message