beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taki...@apache.org
Subject [1/5] beam git commit: [BEAM-2740] Hide BeamSqlEnv.
Date Tue, 15 Aug 2017 18:41:56 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 9eec6a030 -> a1cc5518e


http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
deleted file mode 100644
index 8751bbb..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.math.BigDecimal;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import org.apache.beam.sdk.coders.BeamRecordCoder;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Tests for BeamSqlRowCoder.
- */
-public class BeamSqlRowCoderTest {
-
-  @Test
-  public void encodeAndDecode() throws Exception {
-    final RelProtoDataType protoRowType = new RelProtoDataType() {
-      @Override
-      public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder()
-            .add("col_tinyint", SqlTypeName.TINYINT)
-            .add("col_smallint", SqlTypeName.SMALLINT)
-            .add("col_integer", SqlTypeName.INTEGER)
-            .add("col_bigint", SqlTypeName.BIGINT)
-            .add("col_float", SqlTypeName.FLOAT)
-            .add("col_double", SqlTypeName.DOUBLE)
-            .add("col_decimal", SqlTypeName.DECIMAL)
-            .add("col_string_varchar", SqlTypeName.VARCHAR)
-            .add("col_time", SqlTypeName.TIME)
-            .add("col_timestamp", SqlTypeName.TIMESTAMP)
-            .add("col_boolean", SqlTypeName.BOOLEAN)
-            .build();
-      }
-    };
-
-    BeamRecordSqlType beamSQLRowType = CalciteUtils.toBeamRowType(
-        protoRowType.apply(new JavaTypeFactoryImpl(
-            RelDataTypeSystem.DEFAULT)));
-
-    GregorianCalendar calendar = new GregorianCalendar();
-    calendar.setTime(new Date());
-    BeamRecord row = new BeamRecord(beamSQLRowType
-        , Byte.valueOf("1"), Short.valueOf("1"), 1, 1L, 1.1F, 1.1
-        , BigDecimal.ZERO, "hello", calendar, new Date(), true);
-
-
-    BeamRecordCoder coder = beamSQLRowType.getRecordCoder();
-    CoderProperties.coderDecodeEncodeEqual(coder, row);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
deleted file mode 100644
index e5d81fa..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema.kafka;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.commons.csv.CSVFormat;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for BeamKafkaCSVTable.
- */
-public class BeamKafkaCSVTableTest {
-  @Rule
-  public TestPipeline pipeline = TestPipeline.create();
-  public static BeamRecord row1;
-  public static BeamRecord row2;
-
-  @BeforeClass
-  public static void setUp() {
-    row1 = new BeamRecord(genRowType(), 1L, 1, 1.0);
-
-    row2 = new BeamRecord(genRowType(), 2L, 2, 2.0);
-  }
-
-  @Test public void testCsvRecorderDecoder() throws Exception {
-    PCollection<BeamRecord> result = pipeline
-        .apply(
-            Create.of("1,\"1\",1.0", "2,2,2.0")
-        )
-        .apply(ParDo.of(new String2KvBytes()))
-        .apply(
-            new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT)
-        );
-
-    PAssert.that(result).containsInAnyOrder(row1, row2);
-
-    pipeline.run();
-  }
-
-  @Test public void testCsvRecorderEncoder() throws Exception {
-    PCollection<BeamRecord> result = pipeline
-        .apply(
-            Create.of(row1, row2)
-        )
-        .apply(
-            new BeamKafkaCSVTable.CsvRecorderEncoder(genRowType(), CSVFormat.DEFAULT)
-        ).apply(
-            new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT)
-        );
-
-    PAssert.that(result).containsInAnyOrder(row1, row2);
-
-    pipeline.run();
-  }
-
-  private static BeamRecordSqlType genRowType() {
-    return CalciteUtils.toBeamRowType(new RelProtoDataType() {
-
-      @Override public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder().add("order_id", SqlTypeName.BIGINT)
-            .add("site_id", SqlTypeName.INTEGER)
-            .add("price", SqlTypeName.DOUBLE).build();
-      }
-    }.apply(BeamQueryPlanner.TYPE_FACTORY));
-  }
-
-  private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>
-      implements Serializable {
-    @ProcessElement
-    public void processElement(ProcessContext ctx) {
-      ctx.output(KV.of(new byte[] {}, ctx.element().getBytes()));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
deleted file mode 100644
index 8935c46..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema.text;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVPrinter;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Tests for {@code BeamTextCSVTable}.
- */
-public class BeamTextCSVTableTest {
-
-  @Rule public TestPipeline pipeline = TestPipeline.create();
-  @Rule public TestPipeline pipeline2 = TestPipeline.create();
-
-  /**
-   * testData.
-   *
-   * <p>
-   * The types of the csv fields are:
-   *     integer,bigint,float,double,string
-   * </p>
-   */
-  private static Object[] data1 = new Object[] { 1, 1L, 1.1F, 1.1, "james" };
-  private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" };
-
-  private static List<Object[]> testData = Arrays.asList(data1, data2);
-  private static List<BeamRecord> testDataRows = new ArrayList<BeamRecord>()
{{
-    for (Object[] data : testData) {
-      add(buildRow(data));
-    }
-  }};
-
-  private static Path tempFolder;
-  private static File readerSourceFile;
-  private static File writerTargetFile;
-
-  @Test public void testBuildIOReader() {
-    PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
-        readerSourceFile.getAbsolutePath()).buildIOReader(pipeline);
-    PAssert.that(rows).containsInAnyOrder(testDataRows);
-    pipeline.run();
-  }
-
-  @Test public void testBuildIOWriter() {
-    new BeamTextCSVTable(buildBeamSqlRowType(),
-        readerSourceFile.getAbsolutePath()).buildIOReader(pipeline)
-        .apply(new BeamTextCSVTable(buildBeamSqlRowType(), writerTargetFile.getAbsolutePath())
-            .buildIOWriter());
-    pipeline.run();
-
-    PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
-        writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2);
-
-    // confirm the two reads match
-    PAssert.that(rows).containsInAnyOrder(testDataRows);
-    pipeline2.run();
-  }
-
-  @BeforeClass public static void setUp() throws IOException {
-    tempFolder = Files.createTempDirectory("BeamTextTableTest");
-    readerSourceFile = writeToFile(testData, "readerSourceFile.txt");
-    writerTargetFile = writeToFile(testData, "writerTargetFile.txt");
-  }
-
-  @AfterClass public static void teardownClass() throws IOException {
-    Files.walkFileTree(tempFolder, new SimpleFileVisitor<Path>() {
-
-      @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
-          throws IOException {
-        Files.delete(file);
-        return FileVisitResult.CONTINUE;
-      }
-
-      @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc)
-          throws IOException {
-        Files.delete(dir);
-        return FileVisitResult.CONTINUE;
-      }
-    });
-  }
-
-  private static File writeToFile(List<Object[]> rows, String filename) throws IOException
{
-    File file = tempFolder.resolve(filename).toFile();
-    OutputStream output = new FileOutputStream(file);
-    writeToStreamAndClose(rows, output);
-    return file;
-  }
-
-  /**
-   * Helper that writes the given lines (adding a newline in between) to a stream, then closes
the
-   * stream.
-   */
-  private static void writeToStreamAndClose(List<Object[]> rows, OutputStream outputStream)
{
-    try (PrintStream writer = new PrintStream(outputStream)) {
-      CSVPrinter printer = CSVFormat.DEFAULT.print(writer);
-      for (Object[] row : rows) {
-        for (Object field : row) {
-          printer.print(field);
-        }
-        printer.println();
-      }
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-  }
-
-  private RelProtoDataType buildRowType() {
-    return new RelProtoDataType() {
-
-      @Override public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder().add("id", SqlTypeName.INTEGER).add("order_id", SqlTypeName.BIGINT)
-            .add("price", SqlTypeName.FLOAT).add("amount", SqlTypeName.DOUBLE)
-            .add("user_name", SqlTypeName.VARCHAR).build();
-      }
-    };
-  }
-
-  private static RelDataType buildRelDataType() {
-    return BeamQueryPlanner.TYPE_FACTORY.builder().add("id", SqlTypeName.INTEGER)
-        .add("order_id", SqlTypeName.BIGINT).add("price", SqlTypeName.FLOAT)
-        .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build();
-  }
-
-  private static BeamRecordSqlType buildBeamSqlRowType() {
-    return CalciteUtils.toBeamRowType(buildRelDataType());
-  }
-
-  private static BeamRecord buildRow(Object[] data) {
-    return new BeamRecord(buildBeamSqlRowType(), Arrays.asList(data));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
deleted file mode 100644
index 64f2ccd..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema.transform;
-
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.coders.BeamRecordCoder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
-import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.fun.SqlAvgAggFunction;
-import org.apache.calcite.sql.fun.SqlCountAggFunction;
-import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
-import org.apache.calcite.sql.fun.SqlSumAggFunction;
-import org.apache.calcite.sql.type.BasicSqlType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Unit tests for {@link BeamAggregationTransforms}.
- *
- */
-public class BeamAggregationTransformTest extends BeamTransformBaseTest{
-
-  @Rule
-  public TestPipeline p = TestPipeline.create();
-
-  private List<AggregateCall> aggCalls;
-
-  private BeamRecordSqlType keyType;
-  private BeamRecordSqlType aggPartType;
-  private BeamRecordSqlType outputType;
-
-  private BeamRecordCoder inRecordCoder;
-  private BeamRecordCoder keyCoder;
-  private BeamRecordCoder aggCoder;
-  private BeamRecordCoder outRecordCoder;
-
-  /**
-   * This step equals to below query.
-   * <pre>
-   * SELECT `f_int`
-   * , COUNT(*) AS `size`
-   * , SUM(`f_long`) AS `sum1`, AVG(`f_long`) AS `avg1`
-   * , MAX(`f_long`) AS `max1`, MIN(`f_long`) AS `min1`
-   * , SUM(`f_short`) AS `sum2`, AVG(`f_short`) AS `avg2`
-   * , MAX(`f_short`) AS `max2`, MIN(`f_short`) AS `min2`
-   * , SUM(`f_byte`) AS `sum3`, AVG(`f_byte`) AS `avg3`
-   * , MAX(`f_byte`) AS `max3`, MIN(`f_byte`) AS `min3`
-   * , SUM(`f_float`) AS `sum4`, AVG(`f_float`) AS `avg4`
-   * , MAX(`f_float`) AS `max4`, MIN(`f_float`) AS `min4`
-   * , SUM(`f_double`) AS `sum5`, AVG(`f_double`) AS `avg5`
-   * , MAX(`f_double`) AS `max5`, MIN(`f_double`) AS `min5`
-   * , MAX(`f_timestamp`) AS `max7`, MIN(`f_timestamp`) AS `min7`
-   * ,SUM(`f_int2`) AS `sum8`, AVG(`f_int2`) AS `avg8`
-   * , MAX(`f_int2`) AS `max8`, MIN(`f_int2`) AS `min8`
-   * FROM TABLE_NAME
-   * GROUP BY `f_int`
-   * </pre>
-   * @throws ParseException
-   */
-  @Test
-  public void testCountPerElementBasic() throws ParseException {
-    setupEnvironment();
-
-    PCollection<BeamRecord> input = p.apply(Create.of(inputRows));
-
-    //1. extract fields in group-by key part
-    PCollection<KV<BeamRecord, BeamRecord>> exGroupByStream = input.apply("exGroupBy",
-        WithKeys
-            .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0))))
-        .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, inRecordCoder));
-
-    //2. apply a GroupByKey.
-    PCollection<KV<BeamRecord, Iterable<BeamRecord>>> groupedStream = exGroupByStream
-        .apply("groupBy", GroupByKey.<BeamRecord, BeamRecord>create())
-        .setCoder(KvCoder.<BeamRecord, Iterable<BeamRecord>>of(keyCoder,
-            IterableCoder.<BeamRecord>of(inRecordCoder)));
-
-    //3. run aggregation functions
-    PCollection<KV<BeamRecord, BeamRecord>> aggregatedStream = groupedStream.apply("aggregation",
-        Combine.<BeamRecord, BeamRecord, BeamRecord>groupedValues(
-            new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType)))
-        .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, aggCoder));
-
-    //4. flat KV to a single record
-    PCollection<BeamRecord> mergedStream = aggregatedStream.apply("mergeRecord",
-        ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls,
-1)));
-    mergedStream.setCoder(outRecordCoder);
-
-    //assert function BeamAggregationTransform.AggregationGroupByKeyFn
-    PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn());
-
-    //assert BeamAggregationTransform.AggregationCombineFn
-    PAssert.that(aggregatedStream).containsInAnyOrder(prepareResultOfAggregationCombineFn());
-
-  //assert BeamAggregationTransform.MergeAggregationRecord
-    PAssert.that(mergedStream).containsInAnyOrder(prepareResultOfMergeAggregationRecord());
-
-    p.run();
-}
-
-  private void setupEnvironment() {
-    prepareAggregationCalls();
-    prepareTypeAndCoder();
-  }
-
-  /**
-   * create list of all {@link AggregateCall}.
-   */
-  @SuppressWarnings("deprecation")
-  private void prepareAggregationCalls() {
-    //aggregations for all data type
-    aggCalls = new ArrayList<>();
-    aggCalls.add(
-        new AggregateCall(new SqlCountAggFunction(), false,
-            Arrays.<Integer>asList(),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "count")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlSumAggFunction(
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT)), false,
-            Arrays.<Integer>asList(1),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "sum1")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(1),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "avg1")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(1),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "max1")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(1),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "min1")
-        );
-
-    aggCalls.add(
-        new AggregateCall(new SqlSumAggFunction(
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT)), false,
-            Arrays.<Integer>asList(2),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
-            "sum2")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(2),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
-            "avg2")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(2),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
-            "max2")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(2),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
-            "min2")
-        );
-
-    aggCalls.add(
-        new AggregateCall(
-            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT)),
-            false,
-            Arrays.<Integer>asList(3),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
-            "sum3")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(3),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
-            "avg3")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(3),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
-            "max3")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(3),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
-            "min3")
-        );
-
-    aggCalls.add(
-        new AggregateCall(
-            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT)),
-            false,
-            Arrays.<Integer>asList(4),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
-            "sum4")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(4),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
-            "avg4")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(4),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
-            "max4")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(4),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
-            "min4")
-        );
-
-    aggCalls.add(
-        new AggregateCall(
-            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE)),
-            false,
-            Arrays.<Integer>asList(5),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
-            "sum5")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(5),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
-            "avg5")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(5),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
-            "max5")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(5),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
-            "min5")
-        );
-
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(7),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP),
-            "max7")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(7),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP),
-            "min7")
-        );
-
-    aggCalls.add(
-        new AggregateCall(
-            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER)),
-            false,
-            Arrays.<Integer>asList(8),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
-            "sum8")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(8),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
-            "avg8")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(8),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
-            "max8")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(8),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
-            "min8")
-        );
-  }
-
-  /**
-   * Coders used in aggregation steps.
-   */
-  private void prepareTypeAndCoder() {
-    inRecordCoder = inputRowType.getRecordCoder();
-
-    keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER)));
-    keyCoder = keyType.getRecordCoder();
-
-    aggPartType = initTypeOfSqlRow(
-        Arrays.asList(KV.of("count", SqlTypeName.BIGINT),
-
-            KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
-            KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT),
-
-            KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT),
-            KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT),
-
-            KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT),
-            KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT),
-
-            KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT),
-            KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT),
-
-            KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE),
-            KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE),
-
-            KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP),
-
-            KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
-            KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
-            ));
-    aggCoder = aggPartType.getRecordCoder();
-
-    outputType = prepareFinalRowType();
-    outRecordCoder = outputType.getRecordCoder();
-  }
-
-  /**
-   * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}.
-   */
-  private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationGroupByKeyFn()
{
-    return Arrays.asList(
-        KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
-            inputRows.get(0)),
-        KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
-            inputRows.get(1)),
-        KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
-            inputRows.get(2)),
-        KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
-            inputRows.get(3)));
-  }
-
-  /**
-   * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}.
-   */
-  private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationCombineFn()
-      throws ParseException {
-    return Arrays.asList(
-            KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
-                new BeamRecord(aggPartType, Arrays.<Object>asList(
-                    4L,
-                    10000L, 2500L, 4000L, 1000L,
-                    (short) 10, (short) 2, (short) 4, (short) 1,
-                    (byte) 10, (byte) 2, (byte) 4, (byte) 1,
-                    10.0F, 2.5F, 4.0F, 1.0F,
-                    10.0, 2.5, 4.0, 1.0,
-                    format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"),
-                    10, 2, 4, 1
-                    )))
-            );
-  }
-
-  /**
-   * Row type of final output row.
-   */
-  private BeamRecordSqlType prepareFinalRowType() {
-    FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
-    List<KV<String, SqlTypeName>> columnMetadata =
-        Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT),
-
-        KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
-        KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT),
-
-        KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT),
-        KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT),
-
-        KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT),
-        KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT),
-
-        KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT),
-        KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT),
-
-        KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE),
-        KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE),
-
-        KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP),
-
-        KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
-        KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
-        );
-    for (KV<String, SqlTypeName> cm : columnMetadata) {
-      builder.add(cm.getKey(), cm.getValue());
-    }
-    return CalciteUtils.toBeamRowType(builder.build());
-  }
-
-  /**
-   * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}.
-   */
-  private BeamRecord prepareResultOfMergeAggregationRecord() throws ParseException {
-    return new BeamRecord(outputType, Arrays.<Object>asList(
-        1, 4L,
-        10000L, 2500L, 4000L, 1000L,
-        (short) 10, (short) 2, (short) 4, (short) 1,
-        (byte) 10, (byte) 2, (byte) 4, (byte) 1,
-        10.0F, 2.5F, 4.0F, 1.0F,
-        10.0, 2.5, 4.0, 1.0,
-        format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"),
-        10, 2, 4, 1
-        ));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java
deleted file mode 100644
index da6e95b..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema.transform;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.KV;
-import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.BeforeClass;
-
-/**
- * shared methods to test PTransforms which execute Beam SQL steps.
- *
- */
-public class BeamTransformBaseTest {
-  public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
-  public static BeamRecordSqlType inputRowType;
-  public static List<BeamRecord> inputRows;
-
-  @BeforeClass
-  public static void prepareInput() throws NumberFormatException, ParseException{
-    List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList(
-        KV.of("f_int", SqlTypeName.INTEGER), KV.of("f_long", SqlTypeName.BIGINT),
-        KV.of("f_short", SqlTypeName.SMALLINT), KV.of("f_byte", SqlTypeName.TINYINT),
-        KV.of("f_float", SqlTypeName.FLOAT), KV.of("f_double", SqlTypeName.DOUBLE),
-        KV.of("f_string", SqlTypeName.VARCHAR), KV.of("f_timestamp", SqlTypeName.TIMESTAMP),
-        KV.of("f_int2", SqlTypeName.INTEGER)
-        );
-    inputRowType = initTypeOfSqlRow(columnMetadata);
-    inputRows = Arrays.asList(
-        initBeamSqlRow(columnMetadata,
-            Arrays.<Object>asList(1, 1000L, Short.valueOf("1"), Byte.valueOf("1"),
1.0F, 1.0,
-                "string_row1", format.parse("2017-01-01 01:01:03"), 1)),
-        initBeamSqlRow(columnMetadata,
-            Arrays.<Object>asList(1, 2000L, Short.valueOf("2"), Byte.valueOf("2"),
2.0F, 2.0,
-                "string_row2", format.parse("2017-01-01 01:02:03"), 2)),
-        initBeamSqlRow(columnMetadata,
-            Arrays.<Object>asList(1, 3000L, Short.valueOf("3"), Byte.valueOf("3"),
3.0F, 3.0,
-                "string_row3", format.parse("2017-01-01 01:03:03"), 3)),
-        initBeamSqlRow(columnMetadata, Arrays.<Object>asList(1, 4000L, Short.valueOf("4"),
-            Byte.valueOf("4"), 4.0F, 4.0, "string_row4", format.parse("2017-01-01 02:04:03"),
4)));
-  }
-
-  /**
-   * create a {@code BeamSqlRowType} for given column metadata.
-   */
-  public static BeamRecordSqlType initTypeOfSqlRow(List<KV<String, SqlTypeName>>
columnMetadata){
-    FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
-    for (KV<String, SqlTypeName> cm : columnMetadata) {
-      builder.add(cm.getKey(), cm.getValue());
-    }
-    return CalciteUtils.toBeamRowType(builder.build());
-  }
-
-  /**
-   * Create an empty row with given column metadata.
-   */
-  public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata)
{
-    return initBeamSqlRow(columnMetadata, Arrays.asList());
-  }
-
-  /**
-   * Create a row with given column metadata, and values for each column.
-   *
-   */
-  public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata,
-      List<Object> rowValues){
-    BeamRecordSqlType rowType = initTypeOfSqlRow(columnMetadata);
-
-    return new BeamRecord(rowType, rowValues);
-  }
-
-}


Mime
View raw message