Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BC1FB200CE6 for ; Wed, 2 Aug 2017 07:08:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BA832168AAC; Wed, 2 Aug 2017 05:08:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 62F8C168AAA for ; Wed, 2 Aug 2017 07:08:25 +0200 (CEST) Received: (qmail 59561 invoked by uid 500); 2 Aug 2017 05:08:24 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 59550 invoked by uid 99); 2 Aug 2017 05:08:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Aug 2017 05:08:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5F4C6F333B; Wed, 2 Aug 2017 05:08:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: takidau@apache.org To: commits@beam.apache.org Date: Wed, 02 Aug 2017 05:08:24 -0000 Message-Id: <7a879352a6ef477fb7673904831ecdc6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/59] beam git commit: move dsls/sql to sdks/java/extensions/sql archived-at: Wed, 02 Aug 2017 05:08:26 -0000 Repository: beam Updated Branches: refs/heads/DSL_SQL d32aea969 -> 10962a34d http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java new file mode 100644 index 0000000..b6e11e5 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.schema.text; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for {@code BeamTextCSVTable}. + */ +public class BeamTextCSVTableTest { + + @Rule public TestPipeline pipeline = TestPipeline.create(); + @Rule public TestPipeline pipeline2 = TestPipeline.create(); + + /** + * testData. + * + *

+ * The types of the csv fields are: + * integer,bigint,float,double,string + *

+ */ + private static Object[] data1 = new Object[] { 1, 1L, 1.1F, 1.1, "james" }; + private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" }; + + private static List testData = Arrays.asList(data1, data2); + private static List testDataRows = new ArrayList() {{ + for (Object[] data : testData) { + add(buildRow(data)); + } + }}; + + private static Path tempFolder; + private static File readerSourceFile; + private static File writerTargetFile; + + @Test public void testBuildIOReader() { + PCollection rows = new BeamTextCSVTable(buildBeamSqlRowType(), + readerSourceFile.getAbsolutePath()).buildIOReader(pipeline); + PAssert.that(rows).containsInAnyOrder(testDataRows); + pipeline.run(); + } + + @Test public void testBuildIOWriter() { + new BeamTextCSVTable(buildBeamSqlRowType(), + readerSourceFile.getAbsolutePath()).buildIOReader(pipeline) + .apply(new BeamTextCSVTable(buildBeamSqlRowType(), writerTargetFile.getAbsolutePath()) + .buildIOWriter()); + pipeline.run(); + + PCollection rows = new BeamTextCSVTable(buildBeamSqlRowType(), + writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2); + + // confirm the two reads match + PAssert.that(rows).containsInAnyOrder(testDataRows); + pipeline2.run(); + } + + @BeforeClass public static void setUp() throws IOException { + tempFolder = Files.createTempDirectory("BeamTextTableTest"); + readerSourceFile = writeToFile(testData, "readerSourceFile.txt"); + writerTargetFile = writeToFile(testData, "writerTargetFile.txt"); + } + + @AfterClass public static void teardownClass() throws IOException { + Files.walkFileTree(tempFolder, new SimpleFileVisitor() { + + @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) + throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + + @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) + throws IOException { + Files.delete(dir); + return FileVisitResult.CONTINUE; + } + }); + } + + private static File writeToFile(List rows, String filename) throws IOException { + File file = tempFolder.resolve(filename).toFile(); + OutputStream output = new FileOutputStream(file); + writeToStreamAndClose(rows, output); + return file; + } + + /** + * Helper that writes the given lines (adding a newline in between) to a stream, then closes the + * stream. + */ + private static void writeToStreamAndClose(List rows, OutputStream outputStream) { + try (PrintStream writer = new PrintStream(outputStream)) { + CSVPrinter printer = CSVFormat.DEFAULT.print(writer); + for (Object[] row : rows) { + for (Object field : row) { + printer.print(field); + } + printer.println(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private RelProtoDataType buildRowType() { + return new RelProtoDataType() { + + @Override public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder().add("id", SqlTypeName.INTEGER).add("order_id", SqlTypeName.BIGINT) + .add("price", SqlTypeName.FLOAT).add("amount", SqlTypeName.DOUBLE) + .add("user_name", SqlTypeName.VARCHAR).build(); + } + }; + } + + private static RelDataType buildRelDataType() { + return BeamQueryPlanner.TYPE_FACTORY.builder().add("id", SqlTypeName.INTEGER) + .add("order_id", SqlTypeName.BIGINT).add("price", SqlTypeName.FLOAT) + .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build(); + } + + private static BeamSqlRowType buildBeamSqlRowType() { + return CalciteUtils.toBeamRowType(buildRelDataType()); + } + + private static BeamSqlRow buildRow(Object[] data) { + return new BeamSqlRow(buildBeamSqlRowType(), Arrays.asList(data)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java new file mode 100644 index 0000000..5d5d4fc --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java @@ -0,0 +1,453 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema.transform; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; +import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.fun.SqlAvgAggFunction; +import org.apache.calcite.sql.fun.SqlCountAggFunction; +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction; +import org.apache.calcite.sql.fun.SqlSumAggFunction; +import org.apache.calcite.sql.type.BasicSqlType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.ImmutableBitSet; +import org.junit.Rule; +import org.junit.Test; + +/** + * Unit tests for {@link BeamAggregationTransforms}. + * + */ +public class BeamAggregationTransformTest extends BeamTransformBaseTest{ + + @Rule + public TestPipeline p = TestPipeline.create(); + + private List aggCalls; + + private BeamSqlRowType keyType; + private BeamSqlRowType aggPartType; + private BeamSqlRowType outputType; + + private BeamSqlRowCoder inRecordCoder; + private BeamSqlRowCoder keyCoder; + private BeamSqlRowCoder aggCoder; + private BeamSqlRowCoder outRecordCoder; + + /** + * This step equals to below query. + *

+   * SELECT `f_int`
+   * , COUNT(*) AS `size`
+   * , SUM(`f_long`) AS `sum1`, AVG(`f_long`) AS `avg1`
+   * , MAX(`f_long`) AS `max1`, MIN(`f_long`) AS `min1`
+   * , SUM(`f_short`) AS `sum2`, AVG(`f_short`) AS `avg2`
+   * , MAX(`f_short`) AS `max2`, MIN(`f_short`) AS `min2`
+   * , SUM(`f_byte`) AS `sum3`, AVG(`f_byte`) AS `avg3`
+   * , MAX(`f_byte`) AS `max3`, MIN(`f_byte`) AS `min3`
+   * , SUM(`f_float`) AS `sum4`, AVG(`f_float`) AS `avg4`
+   * , MAX(`f_float`) AS `max4`, MIN(`f_float`) AS `min4`
+   * , SUM(`f_double`) AS `sum5`, AVG(`f_double`) AS `avg5`
+   * , MAX(`f_double`) AS `max5`, MIN(`f_double`) AS `min5`
+   * , MAX(`f_timestamp`) AS `max7`, MIN(`f_timestamp`) AS `min7`
+   * ,SUM(`f_int2`) AS `sum8`, AVG(`f_int2`) AS `avg8`
+   * , MAX(`f_int2`) AS `max8`, MIN(`f_int2`) AS `min8`
+   * FROM TABLE_NAME
+   * GROUP BY `f_int`
+   * 
+ * @throws ParseException + */ + @Test + public void testCountPerElementBasic() throws ParseException { + setupEnvironment(); + + PCollection input = p.apply(Create.of(inputRows)); + + //1. extract fields in group-by key part + PCollection> exGroupByStream = input.apply("exGroupBy", + WithKeys + .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0)))) + .setCoder(KvCoder.of(keyCoder, inRecordCoder)); + + //2. apply a GroupByKey. + PCollection>> groupedStream = exGroupByStream + .apply("groupBy", GroupByKey.create()) + .setCoder(KvCoder.>of(keyCoder, + IterableCoder.of(inRecordCoder))); + + //3. run aggregation functions + PCollection> aggregatedStream = groupedStream.apply("aggregation", + Combine.groupedValues( + new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType))) + .setCoder(KvCoder.of(keyCoder, aggCoder)); + + //4. flat KV to a single record + PCollection mergedStream = aggregatedStream.apply("mergeRecord", + ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1))); + mergedStream.setCoder(outRecordCoder); + + //assert function BeamAggregationTransform.AggregationGroupByKeyFn + PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn()); + + //assert BeamAggregationTransform.AggregationCombineFn + PAssert.that(aggregatedStream).containsInAnyOrder(prepareResultOfAggregationCombineFn()); + + //assert BeamAggregationTransform.MergeAggregationRecord + PAssert.that(mergedStream).containsInAnyOrder(prepareResultOfMergeAggregationRecord()); + + p.run(); +} + + private void setupEnvironment() { + prepareAggregationCalls(); + prepareTypeAndCoder(); + } + + /** + * create list of all {@link AggregateCall}. + */ + @SuppressWarnings("deprecation") + private void prepareAggregationCalls() { + //aggregations for all data type + aggCalls = new ArrayList<>(); + aggCalls.add( + new AggregateCall(new SqlCountAggFunction(), false, + Arrays.asList(), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), + "count") + ); + aggCalls.add( + new AggregateCall(new SqlSumAggFunction( + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT)), false, + Arrays.asList(1), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), + "sum1") + ); + aggCalls.add( + new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, + Arrays.asList(1), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), + "avg1") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.asList(1), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), + "max1") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.asList(1), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), + "min1") + ); + + aggCalls.add( + new AggregateCall(new SqlSumAggFunction( + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT)), false, + Arrays.asList(2), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), + "sum2") + ); + aggCalls.add( + new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, + Arrays.asList(2), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), + "avg2") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.asList(2), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), + "max2") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.asList(2), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), + "min2") + ); + + aggCalls.add( + new AggregateCall( + new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT)), + false, + Arrays.asList(3), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), + "sum3") + ); + aggCalls.add( + new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, + Arrays.asList(3), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), + "avg3") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.asList(3), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), + "max3") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.asList(3), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), + "min3") + ); + + aggCalls.add( + new AggregateCall( + new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT)), + false, + Arrays.asList(4), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), + "sum4") + ); + aggCalls.add( + new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, + Arrays.asList(4), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), + "avg4") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.asList(4), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), + "max4") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.asList(4), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), + "min4") + ); + + aggCalls.add( + new AggregateCall( + new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE)), + false, + Arrays.asList(5), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), + "sum5") + ); + aggCalls.add( + new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, + Arrays.asList(5), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), + "avg5") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.asList(5), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), + "max5") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.asList(5), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), + "min5") + ); + + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.asList(7), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP), + "max7") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.asList(7), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP), + "min7") + ); + + aggCalls.add( + new AggregateCall( + new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER)), + false, + Arrays.asList(8), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), + "sum8") + ); + aggCalls.add( + new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, + Arrays.asList(8), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), + "avg8") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.asList(8), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), + "max8") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.asList(8), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), + "min8") + ); + } + + /** + * Coders used in aggregation steps. + */ + private void prepareTypeAndCoder() { + inRecordCoder = new BeamSqlRowCoder(inputRowType); + + keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER))); + keyCoder = new BeamSqlRowCoder(keyType); + + aggPartType = initTypeOfSqlRow( + Arrays.asList(KV.of("count", SqlTypeName.BIGINT), + + KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT), + KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT), + + KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT), + KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT), + + KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT), + KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT), + + KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT), + KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT), + + KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE), + KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE), + + KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP), + + KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER), + KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER) + )); + aggCoder = new BeamSqlRowCoder(aggPartType); + + outputType = prepareFinalRowType(); + outRecordCoder = new BeamSqlRowCoder(outputType); + } + + /** + * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}. + */ + private List> prepareResultOfAggregationGroupByKeyFn() { + return Arrays.asList( + KV.of(new BeamSqlRow(keyType, Arrays.asList(inputRows.get(0).getInteger(0))), + inputRows.get(0)), + KV.of(new BeamSqlRow(keyType, Arrays.asList(inputRows.get(1).getInteger(0))), + inputRows.get(1)), + KV.of(new BeamSqlRow(keyType, Arrays.asList(inputRows.get(2).getInteger(0))), + inputRows.get(2)), + KV.of(new BeamSqlRow(keyType, Arrays.asList(inputRows.get(3).getInteger(0))), + inputRows.get(3))); + } + + /** + * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}. + */ + private List> prepareResultOfAggregationCombineFn() + throws ParseException { + return Arrays.asList( + KV.of(new BeamSqlRow(keyType, Arrays.asList(inputRows.get(0).getInteger(0))), + new BeamSqlRow(aggPartType, Arrays.asList( + 4L, + 10000L, 2500L, 4000L, 1000L, + (short) 10, (short) 2, (short) 4, (short) 1, + (byte) 10, (byte) 2, (byte) 4, (byte) 1, + 10.0F, 2.5F, 4.0F, 1.0F, + 10.0, 2.5, 4.0, 1.0, + format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"), + 10, 2, 4, 1 + ))) + ); + } + + /** + * Row type of final output row. + */ + private BeamSqlRowType prepareFinalRowType() { + FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); + List> columnMetadata = + Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT), + + KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT), + KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT), + + KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT), + KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT), + + KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT), + KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT), + + KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT), + KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT), + + KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE), + KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE), + + KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP), + + KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER), + KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER) + ); + for (KV cm : columnMetadata) { + builder.add(cm.getKey(), cm.getValue()); + } + return CalciteUtils.toBeamRowType(builder.build()); + } + + /** + * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}. + */ + private BeamSqlRow prepareResultOfMergeAggregationRecord() throws ParseException { + return new BeamSqlRow(outputType, Arrays.asList( + 1, 4L, + 10000L, 2500L, 4000L, 1000L, + (short) 10, (short) 2, (short) 4, (short) 1, + (byte) 10, (byte) 2, (byte) 4, (byte) 1, + 10.0F, 2.5F, 4.0F, 1.0F, + 10.0, 2.5, 4.0, 1.0, + format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"), + 10, 2, 4, 1 + )); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java new file mode 100644 index 0000000..4045bc8 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema.transform; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.BeforeClass; + +/** + * shared methods to test PTransforms which execute Beam SQL steps. + * + */ +public class BeamTransformBaseTest { + public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + public static BeamSqlRowType inputRowType; + public static List inputRows; + + @BeforeClass + public static void prepareInput() throws NumberFormatException, ParseException{ + List> columnMetadata = Arrays.asList( + KV.of("f_int", SqlTypeName.INTEGER), KV.of("f_long", SqlTypeName.BIGINT), + KV.of("f_short", SqlTypeName.SMALLINT), KV.of("f_byte", SqlTypeName.TINYINT), + KV.of("f_float", SqlTypeName.FLOAT), KV.of("f_double", SqlTypeName.DOUBLE), + KV.of("f_string", SqlTypeName.VARCHAR), KV.of("f_timestamp", SqlTypeName.TIMESTAMP), + KV.of("f_int2", SqlTypeName.INTEGER) + ); + inputRowType = initTypeOfSqlRow(columnMetadata); + inputRows = Arrays.asList( + initBeamSqlRow(columnMetadata, + Arrays.asList(1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0F, 1.0, + "string_row1", format.parse("2017-01-01 01:01:03"), 1)), + initBeamSqlRow(columnMetadata, + Arrays.asList(1, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0F, 2.0, + "string_row2", format.parse("2017-01-01 01:02:03"), 2)), + initBeamSqlRow(columnMetadata, + Arrays.asList(1, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0F, 3.0, + "string_row3", format.parse("2017-01-01 01:03:03"), 3)), + initBeamSqlRow(columnMetadata, Arrays.asList(1, 4000L, Short.valueOf("4"), + Byte.valueOf("4"), 4.0F, 4.0, "string_row4", format.parse("2017-01-01 02:04:03"), 4))); + } + + /** + * create a {@code BeamSqlRowType} for given column metadata. + */ + public static BeamSqlRowType initTypeOfSqlRow(List> columnMetadata){ + FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); + for (KV cm : columnMetadata) { + builder.add(cm.getKey(), cm.getValue()); + } + return CalciteUtils.toBeamRowType(builder.build()); + } + + /** + * Create an empty row with given column metadata. + */ + public static BeamSqlRow initBeamSqlRow(List> columnMetadata) { + return initBeamSqlRow(columnMetadata, Arrays.asList()); + } + + /** + * Create a row with given column metadata, and values for each column. + * + */ + public static BeamSqlRow initBeamSqlRow(List> columnMetadata, + List rowValues){ + BeamSqlRowType rowType = initTypeOfSqlRow(columnMetadata); + + return new BeamSqlRow(rowType, rowValues); + } + +}