beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taki...@apache.org
Subject [1/2] beam git commit: Test queries on unbounded PCollections with BeamSql DSL API. Also add getTYPE(fieldName) overrides to BeamSqlRow.
Date Wed, 12 Jul 2017 17:04:24 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL a96c3a01f -> 8defe6f21


Test queries on unbounded PCollections with BeamSql DSL API.
Also add getTYPE(fieldName) overrides to BeamSqlRow.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0580e8b6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0580e8b6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0580e8b6

Branch: refs/heads/DSL_SQL
Commit: 0580e8b639ef77c7a6534b7b91ecad493950a3aa
Parents: a96c3a0
Author: mingmxu <mingmxu@ebay.com>
Authored: Wed Jul 12 00:08:35 2017 -0700
Committer: Tyler Akidau <takidau@apache.org>
Committed: Wed Jul 12 10:01:34 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java |  84 ++++++++----
 .../dsls/sql/BeamSqlDslAggregationTest.java     | 127 +++++++++++++++----
 .../apache/beam/dsls/sql/BeamSqlDslBase.java    |  45 ++++++-
 .../beam/dsls/sql/BeamSqlDslFilterTest.java     |  62 +++++++--
 .../beam/dsls/sql/BeamSqlDslProjectTest.java    |  94 +++++++++++---
 5 files changed, 327 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index 2d7e350..db0ce04 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -152,48 +152,48 @@ public class BeamSqlRow implements Serializable {
     dataValues.set(index, fieldValue);
   }
 
-  public byte getByte(int idx) {
-    return (Byte) getFieldValue(idx);
+  public Object getFieldValue(String fieldName) {
+    return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
   }
 
-  public short getShort(int idx) {
-    return (Short) getFieldValue(idx);
+  public byte getByte(String fieldName) {
+    return (Byte) getFieldValue(fieldName);
   }
 
-  public int getInteger(int idx) {
-    return (Integer) getFieldValue(idx);
+  public short getShort(String fieldName) {
+    return (Short) getFieldValue(fieldName);
   }
 
-  public float getFloat(int idx) {
-    return (Float) getFieldValue(idx);
+  public int getInteger(String fieldName) {
+    return (Integer) getFieldValue(fieldName);
   }
 
-  public double getDouble(int idx) {
-    return (Double) getFieldValue(idx);
+  public float getFloat(String fieldName) {
+    return (Float) getFieldValue(fieldName);
   }
 
-  public long getLong(int idx) {
-    return (Long) getFieldValue(idx);
+  public double getDouble(String fieldName) {
+    return (Double) getFieldValue(fieldName);
   }
 
-  public String getString(int idx) {
-    return (String) getFieldValue(idx);
+  public long getLong(String fieldName) {
+    return (Long) getFieldValue(fieldName);
   }
 
-  public Date getDate(int idx) {
-    return (Date) getFieldValue(idx);
+  public String getString(String fieldName) {
+    return (String) getFieldValue(fieldName);
   }
 
-  public GregorianCalendar getGregorianCalendar(int idx) {
-    return (GregorianCalendar) getFieldValue(idx);
+  public Date getDate(String fieldName) {
+    return (Date) getFieldValue(fieldName);
   }
 
-  public BigDecimal getBigDecimal(int idx) {
-    return (BigDecimal) getFieldValue(idx);
+  public GregorianCalendar getGregorianCalendar(String fieldName) {
+    return (GregorianCalendar) getFieldValue(fieldName);
   }
 
-  public Object getFieldValue(String fieldName) {
-    return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
+  public BigDecimal getBigDecimal(String fieldName) {
+    return (BigDecimal) getFieldValue(fieldName);
   }
 
   public Object getFieldValue(int fieldIdx) {
@@ -281,6 +281,46 @@ public class BeamSqlRow implements Serializable {
     }
   }
 
+  public byte getByte(int idx) {
+    return (Byte) getFieldValue(idx);
+  }
+
+  public short getShort(int idx) {
+    return (Short) getFieldValue(idx);
+  }
+
+  public int getInteger(int idx) {
+    return (Integer) getFieldValue(idx);
+  }
+
+  public float getFloat(int idx) {
+    return (Float) getFieldValue(idx);
+  }
+
+  public double getDouble(int idx) {
+    return (Double) getFieldValue(idx);
+  }
+
+  public long getLong(int idx) {
+    return (Long) getFieldValue(idx);
+  }
+
+  public String getString(int idx) {
+    return (String) getFieldValue(idx);
+  }
+
+  public Date getDate(int idx) {
+    return (Date) getFieldValue(idx);
+  }
+
+  public GregorianCalendar getGregorianCalendar(int idx) {
+    return (GregorianCalendar) getFieldValue(idx);
+  }
+
+  public BigDecimal getBigDecimal(int idx) {
+    return (BigDecimal) getFieldValue(idx);
+  }
+
   public int size() {
     return dataValues.size();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
index ac0b1cb..471a856 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
@@ -29,18 +29,31 @@ import org.joda.time.Instant;
 import org.junit.Test;
 
 /**
- * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window.
+ * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window
+ * with BOUNDED PCollection.
  */
 public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
   /**
-   * GROUP-BY with single aggregation function.
+   * GROUP-BY with single aggregation function with bounded PCollection.
    */
   @Test
-  public void testAggregationWithoutWindow() throws Exception {
+  public void testAggregationWithoutWindowWithBounded() throws Exception {
+    runAggregationWithoutWindow(boundedInput1);
+  }
+
+  /**
+   * GROUP-BY with single aggregation function with unbounded PCollection.
+   */
+  @Test
+  public void testAggregationWithoutWindowWithUnbounded() throws Exception {
+    runAggregationWithoutWindow(unboundedInput1);
+  }
+
+  private void runAggregationWithoutWindow(PCollection<BeamSqlRow> input) throws Exception
{
     String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
 
     PCollection<BeamSqlRow> result =
-        inputA1.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
+        input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
 
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
@@ -55,10 +68,22 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
   }
 
   /**
-   * GROUP-BY with multiple aggregation functions.
+   * GROUP-BY with multiple aggregation functions with bounded PCollection.
    */
   @Test
-  public void testAggregationFunctions() throws Exception{
+  public void testAggregationFunctionsWithBounded() throws Exception{
+    runAggregationFunctions(boundedInput1);
+  }
+
+  /**
+   * GROUP-BY with multiple aggregation functions with unbounded PCollection.
+   */
+  @Test
+  public void testAggregationFunctionsWithUnbounded() throws Exception{
+    runAggregationFunctions(unboundedInput1);
+  }
+
+  private void runAggregationFunctions(PCollection<BeamSqlRow> input) throws Exception{
     String sql = "select f_int2, count(*) as size, "
         + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as
min1,"
         + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short)
as min2,"
@@ -70,7 +95,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
         + "FROM TABLE_A group by f_int2";
 
     PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testAggregationFunctions", BeamSql.query(sql));
 
     BeamSqlRecordType resultType = BeamSqlRecordType.create(
@@ -121,14 +146,26 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
   }
 
   /**
-   * Implicit GROUP-BY with DISTINCT.
+   * Implicit GROUP-BY with DISTINCT with bounded PCollection.
+   */
+  @Test
+  public void testDistinctWithBounded() throws Exception {
+    runDistinct(boundedInput1);
+  }
+
+  /**
+   * Implicit GROUP-BY with DISTINCT with unbounded PCollection.
    */
   @Test
-  public void testDistinct() throws Exception {
+  public void testDistinctWithUnbounded() throws Exception {
+    runDistinct(unboundedInput1);
+  }
+
+  private void runDistinct(PCollection<BeamSqlRow> input) throws Exception {
     String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION ";
 
     PCollection<BeamSqlRow> result =
-        inputA1.apply("testDistinct", BeamSql.simpleQuery(sql));
+        input.apply("testDistinct", BeamSql.simpleQuery(sql));
 
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
@@ -155,16 +192,28 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
   }
 
   /**
-   * GROUP-BY with TUMBLE window(akka fix_time_window).
+   * GROUP-BY with TUMBLE window(aka fix_time_window) with bounded PCollection.
    */
   @Test
-  public void testTumbleWindow() throws Exception {
+  public void testTumbleWindowWithBounded() throws Exception {
+    runTumbleWindow(boundedInput1);
+  }
+
+  /**
+   * GROUP-BY with TUMBLE window(aka fix_time_window) with unbounded PCollection.
+   */
+  @Test
+  public void testTumbleWindowWithUnbounded() throws Exception {
+    runTumbleWindow(unboundedInput1);
+  }
+
+  private void runTumbleWindow(PCollection<BeamSqlRow> input) throws Exception {
     String sql = "SELECT f_int2, COUNT(*) AS `size`,"
         + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`"
-        + " FROM TABLE_A "
-        + "GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
+        + " FROM TABLE_A"
+        + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
     PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testTumbleWindow", BeamSql.query(sql));
 
     BeamSqlRecordType resultType = BeamSqlRecordType.create(
@@ -191,16 +240,28 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
   }
 
   /**
-   * GROUP-BY with HOP window(akka sliding_window).
+   * GROUP-BY with HOP window(aka sliding_window) with bounded PCollection.
    */
   @Test
-  public void testHopWindow() throws Exception {
+  public void testHopWindowWithBounded() throws Exception {
+    runHopWindow(boundedInput1);
+  }
+
+  /**
+   * GROUP-BY with HOP window(aka sliding_window) with unbounded PCollection.
+   */
+  @Test
+  public void testHopWindowWithUnbounded() throws Exception {
+    runHopWindow(unboundedInput1);
+  }
+
+  private void runHopWindow(PCollection<BeamSqlRow> input) throws Exception {
     String sql = "SELECT f_int2, COUNT(*) AS `size`,"
         + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`"
-        + " FROM PCOLLECTION "
-        + "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
+        + " FROM PCOLLECTION"
+        + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
     PCollection<BeamSqlRow> result =
-        inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql));
+        input.apply("testHopWindow", BeamSql.simpleQuery(sql));
 
     BeamSqlRecordType resultType = BeamSqlRecordType.create(
         Arrays.asList("f_int2", "size", "window_start"),
@@ -240,16 +301,28 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
   }
 
   /**
-   * GROUP-BY with SESSION window.
+   * GROUP-BY with SESSION window with bounded PCollection.
+   */
+  @Test
+  public void testSessionWindowWithBounded() throws Exception {
+    runSessionWindow(boundedInput1);
+  }
+
+  /**
+   * GROUP-BY with SESSION window with unbounded PCollection.
    */
   @Test
-  public void testSessionWindow() throws Exception {
+  public void testSessionWindowWithUnbounded() throws Exception {
+    runSessionWindow(unboundedInput1);
+  }
+
+  private void runSessionWindow(PCollection<BeamSqlRow> input) throws Exception {
     String sql = "SELECT f_int2, COUNT(*) AS `size`,"
         + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`"
-        + " FROM TABLE_A "
-        + "GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
+        + " FROM TABLE_A"
+        + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
     PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testSessionWindow", BeamSql.query(sql));
 
     BeamSqlRecordType resultType = BeamSqlRecordType.create(
@@ -285,7 +358,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
         + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)";
     PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
         .apply("testWindowOnNonTimestampField", BeamSql.query(sql));
 
     pipeline.run().waitUntilFinish();
@@ -300,7 +373,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2";
 
     PCollection<BeamSqlRow> result =
-        inputA1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql));
+        boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql));
 
     pipeline.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
index 308dcb6..57fcbc3 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
@@ -28,9 +28,11 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -53,8 +55,13 @@ public class BeamSqlDslBase {
   public static BeamSqlRecordType recordTypeInTableA;
   public static List<BeamSqlRow> recordsInTableA;
 
-  public PCollection<BeamSqlRow> inputA1;
-  public PCollection<BeamSqlRow> inputA2;
+  //bounded PCollections
+  public PCollection<BeamSqlRow> boundedInput1;
+  public PCollection<BeamSqlRow> boundedInput2;
+
+  //unbounded PCollections
+  public PCollection<BeamSqlRow> unboundedInput1;
+  public PCollection<BeamSqlRow> unboundedInput2;
 
   @BeforeClass
   public static void prepareClass() throws ParseException {
@@ -69,11 +76,37 @@ public class BeamSqlDslBase {
 
   @Before
   public void preparePCollections(){
-    inputA1 = PBegin.in(pipeline).apply("inputA1", Create.of(recordsInTableA)
-        .withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+    boundedInput1 = PBegin.in(pipeline).apply("boundedInput1",
+        Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+
+    boundedInput2 = PBegin.in(pipeline).apply("boundedInput2",
+        Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+
+    unboundedInput1 = prepareUnboundedPCollection1();
+    unboundedInput2 = prepareUnboundedPCollection2();
+  }
+
+  private PCollection<BeamSqlRow> prepareUnboundedPCollection1() {
+    TestStream.Builder<BeamSqlRow> values = TestStream
+        .create(new BeamSqlRowCoder(recordTypeInTableA));
+
+    for (BeamSqlRow row : recordsInTableA) {
+      values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
+      values = values.addElements(row);
+    }
+
+    return PBegin.in(pipeline).apply("unboundedInput1", values.advanceWatermarkToInfinity());
+  }
+
+  private PCollection<BeamSqlRow> prepareUnboundedPCollection2() {
+    TestStream.Builder<BeamSqlRow> values = TestStream
+        .create(new BeamSqlRowCoder(recordTypeInTableA));
+
+    BeamSqlRow row = recordsInTableA.get(0);
+    values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
+    values = values.addElements(row);
 
-    inputA2 = PBegin.in(pipeline).apply("inputA2", Create.of(recordsInTableA.get(0))
-        .withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+    return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity());
   }
 
   private static List<BeamSqlRow> prepareInputRecordsInTableA() throws ParseException{

http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
index f46f6c5..b4b50c1 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
@@ -25,18 +25,30 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.junit.Test;
 
 /**
- * Tests for WHERE queries.
+ * Tests for WHERE queries with BOUNDED PCollection.
  */
 public class BeamSqlDslFilterTest extends BeamSqlDslBase {
   /**
-   * single filter.
+   * single filter with bounded PCollection.
    */
   @Test
-  public void testSingleFilter() throws Exception {
+  public void testSingleFilterWithBounded() throws Exception {
+    runSingleFilter(boundedInput1);
+  }
+
+  /**
+   * single filter with unbounded PCollection.
+   */
+  @Test
+  public void testSingleFilterWithUnbounded() throws Exception {
+    runSingleFilter(unboundedInput1);
+  }
+
+  private void runSingleFilter(PCollection<BeamSqlRow> input) throws Exception {
     String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1";
 
     PCollection<BeamSqlRow> result =
-        inputA1.apply("testSingleFilter", BeamSql.simpleQuery(sql));
+        input.apply("testSingleFilter", BeamSql.simpleQuery(sql));
 
     PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
 
@@ -44,15 +56,27 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
   }
 
   /**
-   * composite filters.
+   * composite filters with bounded PCollection.
    */
   @Test
-  public void testCompositeFilter() throws Exception {
+  public void testCompositeFilterWithBounded() throws Exception {
+    runCompositeFilter(boundedInput1);
+  }
+
+  /**
+   * composite filters with unbounded PCollection.
+   */
+  @Test
+  public void testCompositeFilterWithUnbounded() throws Exception {
+    runCompositeFilter(unboundedInput1);
+  }
+
+  private void runCompositeFilter(PCollection<BeamSqlRow> input) throws Exception {
     String sql = "SELECT * FROM TABLE_A"
         + " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')";
 
     PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testCompositeFilter", BeamSql.query(sql));
 
     PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2));
@@ -61,14 +85,26 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
   }
 
   /**
-   * nothing return with filters.
+   * nothing return with filters in bounded PCollection.
+   */
+  @Test
+  public void testNoReturnFilterWithBounded() throws Exception {
+    runNoReturnFilter(boundedInput1);
+  }
+
+  /**
+   * nothing return with filters in unbounded PCollection.
    */
   @Test
-  public void testNoReturnFilter() throws Exception {
+  public void testNoReturnFilterWithUnbounded() throws Exception {
+    runNoReturnFilter(unboundedInput1);
+  }
+
+  private void runNoReturnFilter(PCollection<BeamSqlRow> input) throws Exception {
     String sql = "SELECT * FROM TABLE_A WHERE f_int < 1";
 
     PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testNoReturnFilter", BeamSql.query(sql));
 
     PAssert.that(result).empty();
@@ -85,7 +121,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
     String sql = "SELECT * FROM TABLE_B WHERE f_int < 1";
 
     PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
         .apply("testFromInvalidTableName1", BeamSql.query(sql));
 
     pipeline.run().waitUntilFinish();
@@ -99,7 +135,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
 
     String sql = "SELECT * FROM PCOLLECTION_NA";
 
-    PCollection<BeamSqlRow> result = inputA1.apply(BeamSql.simpleQuery(sql));
+    PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
 
     pipeline.run().waitUntilFinish();
   }
@@ -112,7 +148,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
 
     String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0";
 
-    PCollection<BeamSqlRow> result = inputA1.apply(BeamSql.simpleQuery(sql));
+    PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
 
     pipeline.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
index 877fa4f..10f61b0 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
@@ -28,18 +28,30 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.junit.Test;
 
 /**
- * Tests for field-project in queries.
+ * Tests for field-project in queries with BOUNDED PCollection.
  */
 public class BeamSqlDslProjectTest extends BeamSqlDslBase {
   /**
-   * select all fields.
+   * select all fields with bounded PCollection.
    */
   @Test
-  public void testSelectAll() throws Exception {
+  public void testSelectAllWithBounded() throws Exception {
+    runSelectAll(boundedInput2);
+  }
+
+  /**
+   * select all fields with unbounded PCollection.
+   */
+  @Test
+  public void testSelectAllWithUnbounded() throws Exception {
+    runSelectAll(unboundedInput2);
+  }
+
+  private void runSelectAll(PCollection<BeamSqlRow> input) throws Exception {
     String sql = "SELECT * FROM PCOLLECTION";
 
     PCollection<BeamSqlRow> result =
-        inputA2.apply("testSelectAll", BeamSql.simpleQuery(sql));
+        input.apply("testSelectAll", BeamSql.simpleQuery(sql));
 
     PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
 
@@ -47,14 +59,26 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
   }
 
   /**
-   * select partial fields.
+   * select partial fields with bounded PCollection.
+   */
+  @Test
+  public void testPartialFieldsWithBounded() throws Exception {
+    runPartialFields(boundedInput2);
+  }
+
+  /**
+   * select partial fields with unbounded PCollection.
    */
   @Test
-  public void testPartialFields() throws Exception {
+  public void testPartialFieldsWithUnbounded() throws Exception {
+    runPartialFields(unboundedInput2);
+  }
+
+  private void runPartialFields(PCollection<BeamSqlRow> input) throws Exception {
     String sql = "SELECT f_int, f_long FROM TABLE_A";
 
     PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2)
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testPartialFields", BeamSql.query(sql));
 
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
@@ -70,14 +94,26 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
   }
 
   /**
-   * select partial fields for multiple rows.
+   * select partial fields for multiple rows with bounded PCollection.
    */
   @Test
-  public void testPartialFieldsInMultipleRow() throws Exception {
+  public void testPartialFieldsInMultipleRowWithBounded() throws Exception {
+    runPartialFieldsInMultipleRow(boundedInput1);
+  }
+
+  /**
+   * select partial fields for multiple rows with unbounded PCollection.
+   */
+  @Test
+  public void testPartialFieldsInMultipleRowWithUnbounded() throws Exception {
+    runPartialFieldsInMultipleRow(unboundedInput1);
+  }
+
+  private void runPartialFieldsInMultipleRow(PCollection<BeamSqlRow> input) throws
Exception {
     String sql = "SELECT f_int, f_long FROM TABLE_A";
 
     PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql));
 
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
@@ -105,14 +141,26 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
   }
 
   /**
-   * select partial fields.
+   * select partial fields with bounded PCollection.
    */
   @Test
-  public void testPartialFieldsInRows() throws Exception {
+  public void testPartialFieldsInRowsWithBounded() throws Exception {
+    runPartialFieldsInRows(boundedInput1);
+  }
+
+  /**
+   * select partial fields with unbounded PCollection.
+   */
+  @Test
+  public void testPartialFieldsInRowsWithUnbounded() throws Exception {
+    runPartialFieldsInRows(unboundedInput1);
+  }
+
+  private void runPartialFieldsInRows(PCollection<BeamSqlRow> input) throws Exception
{
     String sql = "SELECT f_int, f_long FROM TABLE_A";
 
     PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testPartialFieldsInRows", BeamSql.query(sql));
 
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
@@ -140,14 +188,26 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
   }
 
   /**
-   * select literal field.
+   * select literal field with bounded PCollection.
+   */
+  @Test
+  public void testLiteralFieldWithBounded() throws Exception {
+    runLiteralField(boundedInput2);
+  }
+
+  /**
+   * select literal field with unbounded PCollection.
    */
   @Test
-  public void testLiteralField() throws Exception {
+  public void testLiteralFieldWithUnbounded() throws Exception {
+    runLiteralField(unboundedInput2);
+  }
+
+  public void runLiteralField(PCollection<BeamSqlRow> input) throws Exception {
     String sql = "SELECT 1 as literal_field FROM TABLE_A";
 
     PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2)
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testLiteralField", BeamSql.query(sql));
 
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"),
@@ -170,7 +230,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
     String sql = "SELECT f_int_na FROM TABLE_A";
 
     PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2)
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
         .apply("testProjectUnknownField", BeamSql.query(sql));
 
     pipeline.run().waitUntilFinish();


Mime
View raw message