Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DC447200CB4 for ; Mon, 12 Jun 2017 16:48:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D92CA160BD6; Mon, 12 Jun 2017 14:48:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B4E38160BDE for ; Mon, 12 Jun 2017 16:48:39 +0200 (CEST) Received: (qmail 23332 invoked by uid 500); 12 Jun 2017 14:48:38 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 23184 invoked by uid 99); 12 Jun 2017 14:48:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Jun 2017 14:48:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 977A7E1863; Mon, 12 Jun 2017 14:48:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lcwik@apache.org To: commits@beam.apache.org Date: Mon, 12 Jun 2017 14:48:40 -0000 Message-Id: <3dd493fb32c44911a58ed5566bce38c4@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/5] beam git commit: rename SQL to Sql in class name archived-at: Mon, 12 Jun 2017 14:48:42 -0000 http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index 6914883..c0d2783 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -18,9 +18,9 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; -import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; import org.apache.beam.sdk.coders.IterableCoder; @@ -55,7 +55,7 @@ import org.joda.time.Duration; */ public class BeamAggregationRel extends Aggregate implements BeamRelNode { private int windowFieldIdx = -1; - private WindowFn windowFn; + private WindowFn windowFn; private Trigger trigger; private Duration allowedLatence = Duration.ZERO; @@ -71,48 +71,48 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { } @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { RelNode input = getInput(); - String stageName = BeamSQLRelUtils.getStageName(this); + String stageName = BeamSqlRelUtils.getStageName(this); - PCollection upstream = - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + PCollection upstream = + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); if (windowFieldIdx != -1) { upstream = upstream.apply("assignEventTimestamp", WithTimestamps - .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) + .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) .setCoder(upstream.getCoder()); } - PCollection windowStream = upstream.apply("window", - Window.into(windowFn) + PCollection windowStream = upstream.apply("window", + Window.into(windowFn) .triggering(trigger) .withAllowedLateness(allowedLatence) .accumulatingFiredPanes()); BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); - PCollection> exGroupByStream = windowStream.apply("exGroupBy", + PCollection> exGroupByStream = windowStream.apply("exGroupBy", WithKeys .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( windowFieldIdx, groupSet))) - .setCoder(KvCoder.of(keyCoder, upstream.getCoder())); + .setCoder(KvCoder.of(keyCoder, upstream.getCoder())); - PCollection>> groupedStream = exGroupByStream - .apply("groupBy", GroupByKey.create()) - .setCoder(KvCoder.>of(keyCoder, - IterableCoder.of(upstream.getCoder()))); + PCollection>> groupedStream = exGroupByStream + .apply("groupBy", GroupByKey.create()) + .setCoder(KvCoder.>of(keyCoder, + IterableCoder.of(upstream.getCoder()))); BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); - PCollection> aggregatedStream = groupedStream.apply("aggregation", - Combine.groupedValues( + PCollection> aggregatedStream = groupedStream.apply("aggregation", + Combine.groupedValues( new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(), - BeamSQLRecordType.from(input.getRowType())))) - .setCoder(KvCoder.of(keyCoder, aggCoder)); + BeamSqlRecordType.from(input.getRowType())))) + .setCoder(KvCoder.of(keyCoder, aggCoder)); - PCollection mergedStream = aggregatedStream.apply("mergeRecord", + PCollection mergedStream = aggregatedStream.apply("mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( - BeamSQLRecordType.from(getRowType()), getAggCallList()))); - mergedStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType()))); + BeamSqlRecordType.from(getRowType()), getAggCallList()))); + mergedStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType()))); return mergedStream; } @@ -120,9 +120,9 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { /** * Type of sub-rowrecord used as Group-By keys. */ - private BeamSQLRecordType exKeyFieldsSchema(RelDataType relDataType) { - BeamSQLRecordType inputRecordType = BeamSQLRecordType.from(relDataType); - BeamSQLRecordType typeOfKey = new BeamSQLRecordType(); + private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) { + BeamSqlRecordType inputRecordType = BeamSqlRecordType.from(relDataType); + BeamSqlRecordType typeOfKey = new BeamSqlRecordType(); for (int i : groupSet.asList()) { if (i != windowFieldIdx) { typeOfKey.addField(inputRecordType.getFieldsName().get(i), @@ -135,8 +135,8 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { /** * Type of sub-rowrecord, that represents the list of aggregation fields. */ - private BeamSQLRecordType exAggFieldsSchema() { - BeamSQLRecordType typeOfAggFields = new BeamSQLRecordType(); + private BeamSqlRecordType exAggFieldsSchema() { + BeamSqlRecordType typeOfAggFields = new BeamSqlRecordType(); for (AggregateCall ac : getAggCallList()) { typeOfAggFields.addField(ac.name, ac.type.getSqlTypeName()); } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java index 3387071..4c5e113 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java @@ -17,13 +17,13 @@ */ package org.apache.beam.dsls.sql.rel; -import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; -import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor; -import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; +import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; +import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.transform.BeamSQLFilterFn; +import org.apache.beam.dsls.sql.transform.BeamSqlFilterFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -50,20 +50,20 @@ public class BeamFilterRel extends Filter implements BeamRelNode { } @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { RelNode input = getInput(); - String stageName = BeamSQLRelUtils.getStageName(this); + String stageName = BeamSqlRelUtils.getStageName(this); - PCollection upstream = - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + PCollection upstream = + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); - BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this); + BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); - PCollection filterStream = upstream.apply(stageName, - ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor))); - filterStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType()))); + PCollection filterStream = upstream.apply(stageName, + ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor))); + filterStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType()))); return filterStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java index f821700..76a7cb8 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java @@ -20,9 +20,9 @@ package org.apache.beam.dsls.sql.rel; import com.google.common.base.Joiner; import java.util.List; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PDone; @@ -57,14 +57,14 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode { * which is the persisted PCollection. */ @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { RelNode input = getInput(); - String stageName = BeamSQLRelUtils.getStageName(this); + String stageName = BeamSqlRelUtils.getStageName(this); - PCollection upstream = - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + PCollection upstream = + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java index 38de41e..3fdeb28 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -19,9 +19,9 @@ package org.apache.beam.dsls.sql.rel; import com.google.common.base.Joiner; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; @@ -41,18 +41,18 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { } @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - String stageName = BeamSQLRelUtils.getStageName(this); + String stageName = BeamSqlRelUtils.getStageName(this); - TupleTag sourceTupleTag = new TupleTag(sourceName); + TupleTag sourceTupleTag = new TupleTag(sourceName); if (inputPCollections.has(sourceTupleTag)) { //choose PCollection from input PCollectionTuple if exists there. - PCollection sourceStream = inputPCollections - .get(new TupleTag(sourceName)); + PCollection sourceStream = inputPCollections + .get(new TupleTag(sourceName)); return sourceStream; } else { //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java index e2645f1..9b7492b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java @@ -19,13 +19,13 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; -import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; -import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor; -import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; +import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; +import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.transform.BeamSQLProjectFn; +import org.apache.beam.dsls.sql.transform.BeamSqlProjectFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -61,19 +61,19 @@ public class BeamProjectRel extends Project implements BeamRelNode { } @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { RelNode input = getInput(); - String stageName = BeamSQLRelUtils.getStageName(this); + String stageName = BeamSqlRelUtils.getStageName(this); - PCollection upstream = - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + PCollection upstream = + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); - BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this); + BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); - PCollection projectStream = upstream.apply(stageName, ParDo - .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType)))); - projectStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType()))); + PCollection projectStream = upstream.apply(stageName, ParDo + .of(new BeamSqlProjectFn(getRelTypeName(), executor, BeamSqlRecordType.from(rowType)))); + projectStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType()))); return projectStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java index ed58090..80d1f39 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java @@ -18,7 +18,7 @@ package org.apache.beam.dsls.sql.rel; import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.rel.RelNode; @@ -34,5 +34,5 @@ public interface BeamRelNode extends RelNode { * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search) * algorithm. */ - PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception; + PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception; } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java index 06a4edf..02fc648 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java @@ -26,9 +26,9 @@ import java.util.Comparator; import java.util.List; import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; -import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.schema.UnsupportedDataTypeException; import org.apache.beam.sdk.coders.ListCoder; @@ -123,10 +123,10 @@ public class BeamSortRel extends Sort implements BeamRelNode { } } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { RelNode input = getInput(); - PCollection upstream = BeamSQLRelUtils.getBeamRelInput(input) + PCollection upstream = BeamSqlRelUtils.getBeamRelInput(input) .buildBeamPipeline(inputPCollections); Type windowType = upstream.getWindowingStrategy().getWindowFn() .getWindowTypeDescriptor().getType(); @@ -135,24 +135,24 @@ public class BeamSortRel extends Sort implements BeamRelNode { "`ORDER BY` is only supported for GlobalWindow, actual window: " + windowType); } - BeamSQLRowComparator comparator = new BeamSQLRowComparator(fieldIndices, orientation, + BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation, nullsFirst); // first find the top (offset + count) - PCollection> rawStream = + PCollection> rawStream = upstream.apply("extractTopOffsetAndFetch", Top.of(startIndex + count, comparator).withoutDefaults()) - .setCoder(ListCoder.of(upstream.getCoder())); + .setCoder(ListCoder.of(upstream.getCoder())); // strip the `leading offset` if (startIndex > 0) { rawStream = rawStream.apply("stripLeadingOffset", ParDo.of( - new SubListFn(startIndex, startIndex + count))) - .setCoder(ListCoder.of(upstream.getCoder())); + new SubListFn(startIndex, startIndex + count))) + .setCoder(ListCoder.of(upstream.getCoder())); } - PCollection orderedStream = rawStream.apply( - "flatten", Flatten.iterables()); - orderedStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType()))); + PCollection orderedStream = rawStream.apply( + "flatten", Flatten.iterables()); + orderedStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType()))); return orderedStream; } @@ -177,12 +177,12 @@ public class BeamSortRel extends Sort implements BeamRelNode { return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch); } - private static class BeamSQLRowComparator implements Comparator, Serializable { + private static class BeamSqlRowComparator implements Comparator, Serializable { private List fieldsIndices; private List orientation; private List nullsFirst; - public BeamSQLRowComparator(List fieldsIndices, + public BeamSqlRowComparator(List fieldsIndices, List orientation, List nullsFirst) { this.fieldsIndices = fieldsIndices; @@ -190,7 +190,7 @@ public class BeamSortRel extends Sort implements BeamRelNode { this.nullsFirst = nullsFirst; } - @Override public int compare(BeamSQLRow row1, BeamSQLRow row2) { + @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) { for (int i = 0; i < fieldsIndices.size(); i++) { int fieldIndex = fieldsIndices.get(i); int fieldRet = 0; http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java index ea59906..9a1887f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java @@ -23,9 +23,9 @@ import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; -import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.Create; @@ -57,17 +57,17 @@ public class BeamValuesRel extends Values implements BeamRelNode { } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { - List rows = new ArrayList<>(tuples.size()); - String stageName = BeamSQLRelUtils.getStageName(this); + List rows = new ArrayList<>(tuples.size()); + String stageName = BeamSqlRelUtils.getStageName(this); if (tuples.isEmpty()) { throw new IllegalStateException("Values with empty tuples!"); } - BeamSQLRecordType beamSQLRecordType = BeamSQLRecordType.from(this.getRowType()); + BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.from(this.getRowType()); for (ImmutableList tuple : tuples) { - BeamSQLRow row = new BeamSQLRow(beamSQLRecordType); + BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); for (int i = 0; i < tuple.size(); i++) { BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue()); } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java index 52d2bbd..333bb10 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java @@ -39,11 +39,11 @@ import org.apache.calcite.schema.Statistics; public abstract class BaseBeamTable implements ScannableTable, Serializable { private RelDataType relDataType; - protected BeamSQLRecordType beamSqlRecordType; + protected BeamSqlRecordType beamSqlRecordType; public BaseBeamTable(RelProtoDataType protoRowType) { this.relDataType = protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY); - this.beamSqlRecordType = BeamSQLRecordType.from(relDataType); + this.beamSqlRecordType = BeamSqlRecordType.from(relDataType); } /** @@ -53,16 +53,16 @@ public abstract class BaseBeamTable implements ScannableTable, Serializable { public abstract BeamIOType getSourceType(); /** - * create a {@code PCollection} from source. + * create a {@code PCollection} from source. * */ - public abstract PCollection buildIOReader(Pipeline pipeline); + public abstract PCollection buildIOReader(Pipeline pipeline); /** * create a {@code IO.write()} instance to write to target. * */ - public abstract PTransform, PDone> buildIOWriter(); + public abstract PTransform, PDone> buildIOWriter(); @Override public Enumerable scan(DataContext root) { http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java index 1c3ab5b..ff77497 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java @@ -26,18 +26,18 @@ import org.apache.beam.sdk.values.PDone; import org.apache.calcite.rel.type.RelProtoDataType; /** - * {@code BeamPCollectionTable} converts a {@code PCollection} as a virtual table, + * {@code BeamPCollectionTable} converts a {@code PCollection} as a virtual table, * then a downstream query can query directly. */ public class BeamPCollectionTable extends BaseBeamTable { private BeamIOType ioType; - private PCollection upstream; + private PCollection upstream; protected BeamPCollectionTable(RelProtoDataType protoRowType) { super(protoRowType); } - public BeamPCollectionTable(PCollection upstream, RelProtoDataType protoRowType){ + public BeamPCollectionTable(PCollection upstream, RelProtoDataType protoRowType){ this(protoRowType); ioType = upstream.isBounded().equals(IsBounded.BOUNDED) ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED; @@ -50,12 +50,12 @@ public class BeamPCollectionTable extends BaseBeamTable { } @Override - public PCollection buildIOReader(Pipeline pipeline) { + public PCollection buildIOReader(Pipeline pipeline) { return upstream; } @Override - public PTransform, PDone> buildIOWriter() { + public PTransform, PDone> buildIOWriter() { throw new BeamInvalidOperatorException("cannot use [BeamPCollectionTable] as target"); } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java deleted file mode 100644 index e8fa82f..0000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * Field type information in {@link BeamSQLRow}. - * - */ -//@DefaultCoder(BeamSQLRecordTypeCoder.class) -public class BeamSQLRecordType implements Serializable { - private List fieldsName = new ArrayList<>(); - private List fieldsType = new ArrayList<>(); - - /** - * Generate from {@link RelDataType} which is used to create table. - */ - public static BeamSQLRecordType from(RelDataType tableInfo) { - BeamSQLRecordType record = new BeamSQLRecordType(); - for (RelDataTypeField f : tableInfo.getFieldList()) { - record.fieldsName.add(f.getName()); - record.fieldsType.add(f.getType().getSqlTypeName()); - } - return record; - } - - public void addField(String fieldName, SqlTypeName fieldType) { - fieldsName.add(fieldName); - fieldsType.add(fieldType); - } - - /** - * Create an instance of {@link RelDataType} so it can be used to create a table. - */ - public RelProtoDataType toRelDataType() { - return new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a) { - FieldInfoBuilder builder = a.builder(); - for (int idx = 0; idx < fieldsName.size(); ++idx) { - builder.add(fieldsName.get(idx), fieldsType.get(idx)); - } - return builder.build(); - } - }; - } - - public int size() { - return fieldsName.size(); - } - - public List getFieldsName() { - return fieldsName; - } - - public void setFieldsName(List fieldsName) { - this.fieldsName = fieldsName; - } - - public List getFieldsType() { - return fieldsType; - } - - public void setFieldsType(List fieldsType) { - this.fieldsType = fieldsType; - } - - @Override - public String toString() { - return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + fieldsType + "]"; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java deleted file mode 100644 index ca045c8..0000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -import java.io.Serializable; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.apache.beam.dsls.sql.exception.InvalidFieldException; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.calcite.sql.type.SqlTypeName; -import org.joda.time.Instant; - -/** - * Repersent a generic ROW record in Beam SQL. - * - */ -public class BeamSQLRow implements Serializable { - - private List nullFields = new ArrayList<>(); - private List dataValues; - private BeamSQLRecordType dataType; - - private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); - private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); - - public BeamSQLRow(BeamSQLRecordType dataType) { - this.dataType = dataType; - this.dataValues = new ArrayList<>(); - for (int idx = 0; idx < dataType.size(); ++idx) { - dataValues.add(null); - nullFields.add(idx); - } - } - - public BeamSQLRow(BeamSQLRecordType dataType, List dataValues) { - this(dataType); - for (int idx = 0; idx < dataValues.size(); ++idx) { - addField(idx, dataValues.get(idx)); - } - } - - public void updateWindowRange(BeamSQLRow upstreamRecord, BoundedWindow window){ - windowStart = upstreamRecord.windowStart; - windowEnd = upstreamRecord.windowEnd; - - if (window instanceof IntervalWindow) { - IntervalWindow iWindow = (IntervalWindow) window; - windowStart = iWindow.start(); - windowEnd = iWindow.end(); - } - } - - public void addField(String fieldName, Object fieldValue) { - addField(dataType.getFieldsName().indexOf(fieldName), fieldValue); - } - - public void addField(int index, Object fieldValue) { - if (fieldValue == null) { - return; - } else { - if (nullFields.contains(index)) { - nullFields.remove(nullFields.indexOf(index)); - } - } - - SqlTypeName fieldType = dataType.getFieldsType().get(index); - switch (fieldType) { - case INTEGER: - if (!(fieldValue instanceof Integer)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case SMALLINT: - if (!(fieldValue instanceof Short)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case TINYINT: - if (!(fieldValue instanceof Byte)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case DOUBLE: - if (!(fieldValue instanceof Double)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case BIGINT: - if (!(fieldValue instanceof Long)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case FLOAT: - if (!(fieldValue instanceof Float)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case DECIMAL: - if (!(fieldValue instanceof BigDecimal)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case VARCHAR: - case CHAR: - if (!(fieldValue instanceof String)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case TIME: - if (!(fieldValue instanceof GregorianCalendar)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case TIMESTAMP: - if (!(fieldValue instanceof Date)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - default: - throw new UnsupportedDataTypeException(fieldType); - } - dataValues.set(index, fieldValue); - } - - public byte getByte(int idx) { - return (Byte) getFieldValue(idx); - } - - public short getShort(int idx) { - return (Short) getFieldValue(idx); - } - - public int getInteger(int idx) { - return (Integer) getFieldValue(idx); - } - - public float getFloat(int idx) { - return (Float) getFieldValue(idx); - } - - public double getDouble(int idx) { - return (Double) getFieldValue(idx); - } - - public long getLong(int idx) { - return (Long) getFieldValue(idx); - } - - public String getString(int idx) { - return (String) getFieldValue(idx); - } - - public Date getDate(int idx) { - return (Date) getFieldValue(idx); - } - - public GregorianCalendar getGregorianCalendar(int idx) { - return (GregorianCalendar) getFieldValue(idx); - } - - public BigDecimal getBigDecimal(int idx) { - return (BigDecimal) getFieldValue(idx); - } - - public Object getFieldValue(String fieldName) { - return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); - } - - public Object getFieldValue(int fieldIdx) { - if (nullFields.contains(fieldIdx)) { - return null; - } - - Object fieldValue = dataValues.get(fieldIdx); - SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx); - - switch (fieldType) { - case INTEGER: - if (!(fieldValue instanceof Integer)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case SMALLINT: - if (!(fieldValue instanceof Short)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case TINYINT: - if (!(fieldValue instanceof Byte)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case DOUBLE: - if (!(fieldValue instanceof Double)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case DECIMAL: - if (!(fieldValue instanceof BigDecimal)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case BIGINT: - if (!(fieldValue instanceof Long)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case FLOAT: - if (!(fieldValue instanceof Float)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case VARCHAR: - case CHAR: - if (!(fieldValue instanceof String)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case TIME: - if (!(fieldValue instanceof GregorianCalendar)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case TIMESTAMP: - if (!(fieldValue instanceof Date)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - default: - throw new UnsupportedDataTypeException(fieldType); - } - } - - public int size() { - return dataValues.size(); - } - - public List getDataValues() { - return dataValues; - } - - public void setDataValues(List dataValues) { - this.dataValues = dataValues; - } - - public BeamSQLRecordType getDataType() { - return dataType; - } - - public void setDataType(BeamSQLRecordType dataType) { - this.dataType = dataType; - } - - public void setNullFields(List nullFields) { - this.nullFields = nullFields; - } - - public List getNullFields() { - return nullFields; - } - - /** - * is the specified field NULL? - */ - public boolean isNull(int idx) { - return nullFields.contains(idx); - } - - public Instant getWindowStart() { - return windowStart; - } - - public Instant getWindowEnd() { - return windowEnd; - } - - public void setWindowStart(Instant windowStart) { - this.windowStart = windowStart; - } - - public void setWindowEnd(Instant windowEnd) { - this.windowEnd = windowEnd; - } - - @Override - public String toString() { - return "BeamSQLRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType=" - + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]"; - } - - /** - * Return data fields as key=value. - */ - public String valueInString() { - StringBuffer sb = new StringBuffer(); - for (int idx = 0; idx < size(); ++idx) { - sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx))); - } - return sb.substring(1); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - BeamSQLRow other = (BeamSQLRow) obj; - return toString().equals(other.toString()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java new file mode 100644 index 0000000..7da08cc --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Field type information in {@link BeamSqlRow}. + * + */ +public class BeamSqlRecordType implements Serializable { + private List fieldsName = new ArrayList<>(); + private List fieldsType = new ArrayList<>(); + + /** + * Generate from {@link RelDataType} which is used to create table. + */ + public static BeamSqlRecordType from(RelDataType tableInfo) { + BeamSqlRecordType record = new BeamSqlRecordType(); + for (RelDataTypeField f : tableInfo.getFieldList()) { + record.fieldsName.add(f.getName()); + record.fieldsType.add(f.getType().getSqlTypeName()); + } + return record; + } + + public void addField(String fieldName, SqlTypeName fieldType) { + fieldsName.add(fieldName); + fieldsType.add(fieldType); + } + + /** + * Create an instance of {@link RelDataType} so it can be used to create a table. + */ + public RelProtoDataType toRelDataType() { + return new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a) { + FieldInfoBuilder builder = a.builder(); + for (int idx = 0; idx < fieldsName.size(); ++idx) { + builder.add(fieldsName.get(idx), fieldsType.get(idx)); + } + return builder.build(); + } + }; + } + + public int size() { + return fieldsName.size(); + } + + public List getFieldsName() { + return fieldsName; + } + + public void setFieldsName(List fieldsName) { + this.fieldsName = fieldsName; + } + + public List getFieldsType() { + return fieldsType; + } + + public void setFieldsType(List fieldsType) { + this.fieldsType = fieldsType; + } + + @Override + public String toString() { + return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + fieldsType + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java new file mode 100644 index 0000000..0f82733 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.beam.dsls.sql.exception.InvalidFieldException; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.calcite.sql.type.SqlTypeName; +import org.joda.time.Instant; + +/** + * Repersent a generic ROW record in Beam SQL. + * + */ +public class BeamSqlRow implements Serializable { + + private List nullFields = new ArrayList<>(); + private List dataValues; + private BeamSqlRecordType dataType; + + private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); + private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); + + public BeamSqlRow(BeamSqlRecordType dataType) { + this.dataType = dataType; + this.dataValues = new ArrayList<>(); + for (int idx = 0; idx < dataType.size(); ++idx) { + dataValues.add(null); + nullFields.add(idx); + } + } + + public BeamSqlRow(BeamSqlRecordType dataType, List dataValues) { + this(dataType); + for (int idx = 0; idx < dataValues.size(); ++idx) { + addField(idx, dataValues.get(idx)); + } + } + + public void updateWindowRange(BeamSqlRow upstreamRecord, BoundedWindow window){ + windowStart = upstreamRecord.windowStart; + windowEnd = upstreamRecord.windowEnd; + + if (window instanceof IntervalWindow) { + IntervalWindow iWindow = (IntervalWindow) window; + windowStart = iWindow.start(); + windowEnd = iWindow.end(); + } + } + + public void addField(String fieldName, Object fieldValue) { + addField(dataType.getFieldsName().indexOf(fieldName), fieldValue); + } + + public void addField(int index, Object fieldValue) { + if (fieldValue == null) { + return; + } else { + if (nullFields.contains(index)) { + nullFields.remove(nullFields.indexOf(index)); + } + } + + SqlTypeName fieldType = dataType.getFieldsType().get(index); + switch (fieldType) { + case INTEGER: + if (!(fieldValue instanceof Integer)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case SMALLINT: + if (!(fieldValue instanceof Short)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case TINYINT: + if (!(fieldValue instanceof Byte)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case DOUBLE: + if (!(fieldValue instanceof Double)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case BIGINT: + if (!(fieldValue instanceof Long)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case FLOAT: + if (!(fieldValue instanceof Float)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case DECIMAL: + if (!(fieldValue instanceof BigDecimal)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case VARCHAR: + case CHAR: + if (!(fieldValue instanceof String)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case TIME: + if (!(fieldValue instanceof GregorianCalendar)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case TIMESTAMP: + if (!(fieldValue instanceof Date)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + default: + throw new UnsupportedDataTypeException(fieldType); + } + dataValues.set(index, fieldValue); + } + + public byte getByte(int idx) { + return (Byte) getFieldValue(idx); + } + + public short getShort(int idx) { + return (Short) getFieldValue(idx); + } + + public int getInteger(int idx) { + return (Integer) getFieldValue(idx); + } + + public float getFloat(int idx) { + return (Float) getFieldValue(idx); + } + + public double getDouble(int idx) { + return (Double) getFieldValue(idx); + } + + public long getLong(int idx) { + return (Long) getFieldValue(idx); + } + + public String getString(int idx) { + return (String) getFieldValue(idx); + } + + public Date getDate(int idx) { + return (Date) getFieldValue(idx); + } + + public GregorianCalendar getGregorianCalendar(int idx) { + return (GregorianCalendar) getFieldValue(idx); + } + + public BigDecimal getBigDecimal(int idx) { + return (BigDecimal) getFieldValue(idx); + } + + public Object getFieldValue(String fieldName) { + return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); + } + + public Object getFieldValue(int fieldIdx) { + if (nullFields.contains(fieldIdx)) { + return null; + } + + Object fieldValue = dataValues.get(fieldIdx); + SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx); + + switch (fieldType) { + case INTEGER: + if (!(fieldValue instanceof Integer)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case SMALLINT: + if (!(fieldValue instanceof Short)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case TINYINT: + if (!(fieldValue instanceof Byte)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case DOUBLE: + if (!(fieldValue instanceof Double)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case DECIMAL: + if (!(fieldValue instanceof BigDecimal)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case BIGINT: + if (!(fieldValue instanceof Long)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case FLOAT: + if (!(fieldValue instanceof Float)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case VARCHAR: + case CHAR: + if (!(fieldValue instanceof String)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case TIME: + if (!(fieldValue instanceof GregorianCalendar)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case TIMESTAMP: + if (!(fieldValue instanceof Date)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + default: + throw new UnsupportedDataTypeException(fieldType); + } + } + + public int size() { + return dataValues.size(); + } + + public List getDataValues() { + return dataValues; + } + + public void setDataValues(List dataValues) { + this.dataValues = dataValues; + } + + public BeamSqlRecordType getDataType() { + return dataType; + } + + public void setDataType(BeamSqlRecordType dataType) { + this.dataType = dataType; + } + + public void setNullFields(List nullFields) { + this.nullFields = nullFields; + } + + public List getNullFields() { + return nullFields; + } + + /** + * is the specified field NULL? + */ + public boolean isNull(int idx) { + return nullFields.contains(idx); + } + + public Instant getWindowStart() { + return windowStart; + } + + public Instant getWindowEnd() { + return windowEnd; + } + + public void setWindowStart(Instant windowStart) { + this.windowStart = windowStart; + } + + public void setWindowEnd(Instant windowEnd) { + this.windowEnd = windowEnd; + } + + @Override + public String toString() { + return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType=" + + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]"; + } + + /** + * Return data fields as key=value. + */ + public String valueInString() { + StringBuffer sb = new StringBuffer(); + for (int idx = 0; idx < size(); ++idx) { + sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx))); + } + return sb.substring(1); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + BeamSqlRow other = (BeamSqlRow) obj; + return toString().equals(other.toString()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index 14a0f31..6552dd3 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -35,10 +35,10 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; /** - * A {@link Coder} encodes {@link BeamSQLRow}. + * A {@link Coder} encodes {@link BeamSqlRow}. */ -public class BeamSqlRowCoder extends CustomCoder { - private BeamSQLRecordType tableSchema; +public class BeamSqlRowCoder extends CustomCoder { + private BeamSqlRecordType tableSchema; private static final ListCoder listCoder = ListCoder.of(BigEndianIntegerCoder.of()); @@ -49,12 +49,12 @@ public class BeamSqlRowCoder extends CustomCoder { private static final InstantCoder instantCoder = InstantCoder.of(); private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of(); - public BeamSqlRowCoder(BeamSQLRecordType tableSchema) { + public BeamSqlRowCoder(BeamSqlRecordType tableSchema) { this.tableSchema = tableSchema; } @Override - public void encode(BeamSQLRow value, OutputStream outStream) throws CoderException, IOException { + public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException { listCoder.encode(value.getNullFields(), outStream); for (int idx = 0; idx < value.size(); ++idx) { @@ -105,10 +105,10 @@ public class BeamSqlRowCoder extends CustomCoder { } @Override - public BeamSQLRow decode(InputStream inStream) throws CoderException, IOException { + public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException { List nullFields = listCoder.decode(inStream); - BeamSQLRow record = new BeamSQLRow(tableSchema); + BeamSqlRow record = new BeamSqlRow(tableSchema); record.setNullFields(nullFields); for (int idx = 0; idx < tableSchema.size(); ++idx) { @@ -162,7 +162,7 @@ public class BeamSqlRowCoder extends CustomCoder { return record; } - public BeamSQLRecordType getTableSchema() { + public BeamSqlRecordType getTableSchema() { return tableSchema; } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java index c7397e1..134cf8f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java @@ -35,11 +35,11 @@ import org.apache.commons.csv.CSVRecord; * Utility methods for working with {@code BeamTable}. */ public final class BeamTableUtils { - public static BeamSQLRow csvLine2BeamSQLRow( + public static BeamSqlRow csvLine2BeamSqlRow( CSVFormat csvFormat, String line, - BeamSQLRecordType beamSqlRecordType) { - BeamSQLRow row = new BeamSQLRow(beamSqlRecordType); + BeamSqlRecordType beamSqlRecordType) { + BeamSqlRow row = new BeamSqlRow(beamSqlRecordType); try (StringReader reader = new StringReader(line)) { CSVParser parser = csvFormat.parse(reader); CSVRecord rawRecord = parser.getRecords().get(0); @@ -61,7 +61,7 @@ public final class BeamTableUtils { return row; } - public static String beamSQLRow2CsvLine(BeamSQLRow row, CSVFormat csvFormat) { + public static String beamSqlRow2CsvLine(BeamSqlRow row, CSVFormat csvFormat) { StringWriter writer = new StringWriter(); try (CSVPrinter printer = csvFormat.print(writer)) { for (int i = 0; i < row.size(); i++) { @@ -74,7 +74,7 @@ public final class BeamTableUtils { return writer.toString(); } - public static void addFieldWithAutoTypeCasting(BeamSQLRow row, int idx, Object rawObj) { + public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, Object rawObj) { if (rawObj == null) { row.addField(idx, rawObj); return; http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java index 127870c..f8c2553 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java @@ -17,13 +17,13 @@ */ package org.apache.beam.dsls.sql.schema.kafka; -import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSQLRow2CsvLine; -import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSQLRow; +import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine; +import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow; import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -50,62 +50,62 @@ public class BeamKafkaCSVTable extends BeamKafkaTable { } @Override - public PTransform>, PCollection> + public PTransform>, PCollection> getPTransformForInput() { return new CsvRecorderDecoder(beamSqlRecordType, csvFormat); } @Override - public PTransform, PCollection>> + public PTransform, PCollection>> getPTransformForOutput() { return new CsvRecorderEncoder(beamSqlRecordType, csvFormat); } /** - * A PTransform to convert {@code KV} to {@link BeamSQLRow}. + * A PTransform to convert {@code KV} to {@link BeamSqlRow}. * */ public static class CsvRecorderDecoder - extends PTransform>, PCollection> { - private BeamSQLRecordType recordType; + extends PTransform>, PCollection> { + private BeamSqlRecordType recordType; private CSVFormat format; - public CsvRecorderDecoder(BeamSQLRecordType recordType, CSVFormat format) { + public CsvRecorderDecoder(BeamSqlRecordType recordType, CSVFormat format) { this.recordType = recordType; this.format = format; } @Override - public PCollection expand(PCollection> input) { - return input.apply("decodeRecord", ParDo.of(new DoFn, BeamSQLRow>() { + public PCollection expand(PCollection> input) { + return input.apply("decodeRecord", ParDo.of(new DoFn, BeamSqlRow>() { @ProcessElement public void processElement(ProcessContext c) { String rowInString = new String(c.element().getValue()); - c.output(csvLine2BeamSQLRow(format, rowInString, recordType)); + c.output(csvLine2BeamSqlRow(format, rowInString, recordType)); } })); } } /** - * A PTransform to convert {@link BeamSQLRow} to {@code KV}. + * A PTransform to convert {@link BeamSqlRow} to {@code KV}. * */ public static class CsvRecorderEncoder - extends PTransform, PCollection>> { - private BeamSQLRecordType recordType; + extends PTransform, PCollection>> { + private BeamSqlRecordType recordType; private CSVFormat format; - public CsvRecorderEncoder(BeamSQLRecordType recordType, CSVFormat format) { + public CsvRecorderEncoder(BeamSqlRecordType recordType, CSVFormat format) { this.recordType = recordType; this.format = format; } @Override - public PCollection> expand(PCollection input) { - return input.apply("encodeRecord", ParDo.of(new DoFn>() { + public PCollection> expand(PCollection input) { + return input.apply("encodeRecord", ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { - BeamSQLRow in = c.element(); - c.output(KV.of(new byte[] {}, beamSQLRow2CsvLine(in, format).getBytes())); + BeamSqlRow in = c.element(); + c.output(KV.of(new byte[] {}, beamSqlRow2CsvLine(in, format).getBytes())); } })); } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java index aa7cf3a..c43fa2c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.Map; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.io.kafka.KafkaIO; @@ -38,7 +38,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; /** * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to - * extend to convert between {@code BeamSQLRow} and {@code KV}. + * extend to convert between {@code BeamSqlRow} and {@code KV}. * */ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable { @@ -68,14 +68,14 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab return BeamIOType.UNBOUNDED; } - public abstract PTransform>, PCollection> + public abstract PTransform>, PCollection> getPTransformForInput(); - public abstract PTransform, PCollection>> + public abstract PTransform, PCollection>> getPTransformForOutput(); @Override - public PCollection buildIOReader(Pipeline pipeline) { + public PCollection buildIOReader(Pipeline pipeline) { return PBegin.in(pipeline).apply("read", KafkaIO.read() .withBootstrapServers(bootstrapServers) @@ -88,13 +88,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab } @Override - public PTransform, PDone> buildIOWriter() { + public PTransform, PDone> buildIOWriter() { checkArgument(topics != null && topics.size() == 1, "Only one topic can be acceptable as output."); - return new PTransform, PDone>() { + return new PTransform, PDone>() { @Override - public PDone expand(PCollection input) { + public PDone expand(PCollection input) { return input.apply("out_reformat", getPTransformForOutput()).apply("persistent", KafkaIO.write() .withBootstrapServers(bootstrapServers) http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java index 41742c7..e575eee 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java @@ -18,7 +18,7 @@ package org.apache.beam.dsls.sql.schema.text; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.PTransform; @@ -57,14 +57,14 @@ public class BeamTextCSVTable extends BeamTextTable { } @Override - public PCollection buildIOReader(Pipeline pipeline) { + public PCollection buildIOReader(Pipeline pipeline) { return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern)) .apply("parseCSVLine", new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat)); } @Override - public PTransform, PDone> buildIOWriter() { + public PTransform, PDone> buildIOWriter() { return new BeamTextCSVTableIOWriter(beamSqlRecordType, filePattern, csvFormat); } } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java index 59d77c0..ef0a465 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java @@ -18,12 +18,12 @@ package org.apache.beam.dsls.sql.schema.text; -import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSQLRow; +import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow; import java.io.Serializable; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -34,13 +34,13 @@ import org.apache.commons.csv.CSVFormat; * IOReader for {@code BeamTextCSVTable}. */ public class BeamTextCSVTableIOReader - extends PTransform, PCollection> + extends PTransform, PCollection> implements Serializable { private String filePattern; - protected BeamSQLRecordType beamSqlRecordType; + protected BeamSqlRecordType beamSqlRecordType; protected CSVFormat csvFormat; - public BeamTextCSVTableIOReader(BeamSQLRecordType beamSqlRecordType, String filePattern, + public BeamTextCSVTableIOReader(BeamSqlRecordType beamSqlRecordType, String filePattern, CSVFormat csvFormat) { this.filePattern = filePattern; this.beamSqlRecordType = beamSqlRecordType; @@ -48,12 +48,12 @@ public class BeamTextCSVTableIOReader } @Override - public PCollection expand(PCollection input) { - return input.apply(ParDo.of(new DoFn() { + public PCollection expand(PCollection input) { + return input.apply(ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext ctx) { String str = ctx.element(); - ctx.output(csvLine2BeamSQLRow(csvFormat, str, beamSqlRecordType)); + ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRecordType)); } })); } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java index 9b9cbd2..35a546c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java @@ -18,12 +18,12 @@ package org.apache.beam.dsls.sql.schema.text; -import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSQLRow2CsvLine; +import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine; import java.io.Serializable; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -35,25 +35,25 @@ import org.apache.commons.csv.CSVFormat; /** * IOWriter for {@code BeamTextCSVTable}. */ -public class BeamTextCSVTableIOWriter extends PTransform, PDone> +public class BeamTextCSVTableIOWriter extends PTransform, PDone> implements Serializable { private String filePattern; - protected BeamSQLRecordType beamSqlRecordType; + protected BeamSqlRecordType beamSqlRecordType; protected CSVFormat csvFormat; - public BeamTextCSVTableIOWriter(BeamSQLRecordType beamSqlRecordType, String filePattern, + public BeamTextCSVTableIOWriter(BeamSqlRecordType beamSqlRecordType, String filePattern, CSVFormat csvFormat) { this.filePattern = filePattern; this.beamSqlRecordType = beamSqlRecordType; this.csvFormat = csvFormat; } - @Override public PDone expand(PCollection input) { - return input.apply("encodeRecord", ParDo.of(new DoFn() { + @Override public PDone expand(PCollection input) { + return input.apply("encodeRecord", ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext ctx) { - BeamSQLRow row = ctx.element(); - ctx.output(beamSQLRow2CsvLine(row, csvFormat)); + BeamSqlRow row = ctx.element(); + ctx.output(beamSqlRow2CsvLine(row, csvFormat)); } })).apply(TextIO.write().to(filePattern)); } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java index 943c897..a282ff9 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java @@ -26,8 +26,8 @@ import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -46,11 +46,11 @@ public class BeamAggregationTransforms implements Serializable{ /** * Merge KV to single record. */ - public static class MergeAggregationRecord extends DoFn, BeamSQLRow> { - private BeamSQLRecordType outRecordType; + public static class MergeAggregationRecord extends DoFn, BeamSqlRow> { + private BeamSqlRecordType outRecordType; private List aggFieldNames; - public MergeAggregationRecord(BeamSQLRecordType outRecordType, List aggList) { + public MergeAggregationRecord(BeamSqlRecordType outRecordType, List aggList) { this.outRecordType = outRecordType; this.aggFieldNames = new ArrayList<>(); for (AggregateCall ac : aggList) { @@ -60,10 +60,10 @@ public class BeamAggregationTransforms implements Serializable{ @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) { - BeamSQLRow outRecord = new BeamSQLRow(outRecordType); + BeamSqlRow outRecord = new BeamSqlRow(outRecordType); outRecord.updateWindowRange(c.element().getKey(), window); - KV kvRecord = c.element(); + KV kvRecord = c.element(); for (String f : kvRecord.getKey().getDataType().getFieldsName()) { outRecord.addField(f, kvRecord.getKey().getFieldValue(f)); } @@ -81,7 +81,7 @@ public class BeamAggregationTransforms implements Serializable{ * extract group-by fields. */ public static class AggregationGroupByKeyFn - implements SerializableFunction { + implements SerializableFunction { private List groupByKeys; public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) { @@ -94,9 +94,9 @@ public class BeamAggregationTransforms implements Serializable{ } @Override - public BeamSQLRow apply(BeamSQLRow input) { - BeamSQLRecordType typeOfKey = exTypeOfKeyRecord(input.getDataType()); - BeamSQLRow keyOfRecord = new BeamSQLRow(typeOfKey); + public BeamSqlRow apply(BeamSqlRow input) { + BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(input.getDataType()); + BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey); keyOfRecord.updateWindowRange(input, null); for (int idx = 0; idx < groupByKeys.size(); ++idx) { @@ -105,8 +105,8 @@ public class BeamAggregationTransforms implements Serializable{ return keyOfRecord; } - private BeamSQLRecordType exTypeOfKeyRecord(BeamSQLRecordType dataType) { - BeamSQLRecordType typeOfKey = new BeamSQLRecordType(); + private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) { + BeamSqlRecordType typeOfKey = new BeamSqlRecordType(); for (int idx : groupByKeys) { typeOfKey.addField(dataType.getFieldsName().get(idx), dataType.getFieldsType().get(idx)); } @@ -118,7 +118,7 @@ public class BeamAggregationTransforms implements Serializable{ /** * Assign event timestamp. */ - public static class WindowTimestampFn implements SerializableFunction { + public static class WindowTimestampFn implements SerializableFunction { private int windowFieldIdx = -1; public WindowTimestampFn(int windowFieldIdx) { @@ -127,7 +127,7 @@ public class BeamAggregationTransforms implements Serializable{ } @Override - public Instant apply(BeamSQLRow input) { + public Instant apply(BeamSqlRow input) { return new Instant(input.getDate(windowFieldIdx).getTime()); } } @@ -142,8 +142,8 @@ public class BeamAggregationTransforms implements Serializable{ * 3). SUM/AVG works for INT, LONG, FLOAT, DOUBLE, DECIMAL, SMALLINT, TINYINT;
* */ - public static class AggregationCombineFn extends CombineFn { - private BeamSQLRecordType aggDataType; + public static class AggregationCombineFn extends CombineFn { + private BeamSqlRecordType aggDataType; private int countIndex = -1; @@ -151,8 +151,8 @@ public class BeamAggregationTransforms implements Serializable{ List aggElementExpressions; public AggregationCombineFn(List aggregationCalls, - BeamSQLRecordType sourceRowRecordType) { - this.aggDataType = new BeamSQLRecordType(); + BeamSqlRecordType sourceRowRecordType) { + this.aggDataType = new BeamSqlRecordType(); this.aggFunctions = new ArrayList<>(); this.aggElementExpressions = new ArrayList<>(); @@ -254,8 +254,8 @@ public class BeamAggregationTransforms implements Serializable{ } @Override - public BeamSQLRow createAccumulator() { - BeamSQLRow initialRecord = new BeamSQLRow(aggDataType); + public BeamSqlRow createAccumulator() { + BeamSqlRow initialRecord = new BeamSqlRow(aggDataType); for (int idx = 0; idx < aggElementExpressions.size(); ++idx) { BeamSqlExpression ex = aggElementExpressions.get(idx); String aggFnName = aggFunctions.get(idx); @@ -351,8 +351,8 @@ public class BeamAggregationTransforms implements Serializable{ } @Override - public BeamSQLRow addInput(BeamSQLRow accumulator, BeamSQLRow input) { - BeamSQLRow deltaRecord = new BeamSQLRow(aggDataType); + public BeamSqlRow addInput(BeamSqlRow accumulator, BeamSqlRow input) { + BeamSqlRow deltaRecord = new BeamSqlRow(aggDataType); for (int idx = 0; idx < aggElementExpressions.size(); ++idx) { BeamSqlExpression ex = aggElementExpressions.get(idx); String aggFnName = aggFunctions.get(idx); @@ -468,11 +468,11 @@ public class BeamAggregationTransforms implements Serializable{ } @Override - public BeamSQLRow mergeAccumulators(Iterable accumulators) { - BeamSQLRow deltaRecord = new BeamSQLRow(aggDataType); + public BeamSqlRow mergeAccumulators(Iterable accumulators) { + BeamSqlRow deltaRecord = new BeamSqlRow(aggDataType); while (accumulators.iterator().hasNext()) { - BeamSQLRow sa = accumulators.iterator().next(); + BeamSqlRow sa = accumulators.iterator().next(); for (int idx = 0; idx < aggElementExpressions.size(); ++idx) { BeamSqlExpression ex = aggElementExpressions.get(idx); String aggFnName = aggFunctions.get(idx); @@ -575,8 +575,8 @@ public class BeamAggregationTransforms implements Serializable{ } @Override - public BeamSQLRow extractOutput(BeamSQLRow accumulator) { - BeamSQLRow finalRecord = new BeamSQLRow(aggDataType); + public BeamSqlRow extractOutput(BeamSqlRow accumulator) { + BeamSqlRow finalRecord = new BeamSqlRow(aggDataType); for (int idx = 0; idx < aggElementExpressions.size(); ++idx) { BeamSqlExpression ex = aggElementExpressions.get(idx); String aggFnName = aggFunctions.get(idx);