beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ming...@apache.org
Subject [34/66] [abbrv] beam git commit: move all implementation classes/packages into impl package
Date Mon, 11 Sep 2017 20:19:17 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
new file mode 100644
index 0000000..dd01a87
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.planner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The core component to handle through a SQL statement, from explain execution plan,
+ * to generate a Beam pipeline.
+ *
+ */
+public class BeamQueryPlanner {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class);
+
+  protected final Planner planner;
+  private Map<String, BaseBeamTable> sourceTables = new HashMap<>();
+
+  public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+
+  public BeamQueryPlanner(SchemaPlus schema) {
+    final List<RelTraitDef> traitDefs = new ArrayList<>();
+    traitDefs.add(ConventionTraitDef.INSTANCE);
+    traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+    List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+    sqlOperatorTables.add(SqlStdOperatorTable.instance());
+    sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), false,
+        Collections.<String>emptyList(), TYPE_FACTORY));
+
+    FrameworkConfig config = Frameworks.newConfigBuilder()
+        .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
+        .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
+        .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
+        .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+        .build();
+    this.planner = Frameworks.getPlanner(config);
+
+    for (String t : schema.getTableNames()) {
+      sourceTables.put(t, (BaseBeamTable) schema.getTable(t));
+    }
+  }
+
+  /**
+   * Parse input SQL query, and return a {@link SqlNode} as grammar tree.
+   */
+  public SqlNode parseQuery(String sqlQuery) throws SqlParseException{
+    return planner.parse(sqlQuery);
+  }
+
+  /**
+   * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow,
+   * which is linked with the given {@code pipeline}. The final output stream is returned as
+   * {@code PCollection} so more operations can be applied.
+   */
+  public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline
+      , BeamSqlEnv sqlEnv) throws Exception {
+    BeamRelNode relNode = convertToBeamRel(sqlStatement);
+
+    // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
+    return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv);
+  }
+
+  /**
+   * It parses and validate the input query, then convert into a
+   * {@link BeamRelNode} tree.
+   *
+   */
+  public BeamRelNode convertToBeamRel(String sqlStatement)
+      throws ValidationException, RelConversionException, SqlParseException {
+    BeamRelNode beamRelNode;
+    try {
+      beamRelNode = (BeamRelNode) validateAndConvert(planner.parse(sqlStatement));
+    } finally {
+      planner.close();
+    }
+    return beamRelNode;
+  }
+
+  private RelNode validateAndConvert(SqlNode sqlNode)
+      throws ValidationException, RelConversionException {
+    SqlNode validated = validateNode(sqlNode);
+    LOG.info("SQL:\n" + validated);
+    RelNode relNode = convertToRelNode(validated);
+    return convertToBeamRel(relNode);
+  }
+
+  private RelNode convertToBeamRel(RelNode relNode) throws RelConversionException {
+    RelTraitSet traitSet = relNode.getTraitSet();
+
+    LOG.info("SQLPlan>\n" + RelOptUtil.toString(relNode));
+
+    // PlannerImpl.transform() optimizes RelNode with ruleset
+    return planner.transform(0, traitSet.plus(BeamLogicalConvention.INSTANCE), relNode);
+  }
+
+  private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
+    return planner.rel(sqlNode).rel;
+  }
+
+  private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
+    return planner.validate(sqlNode);
+  }
+
+  public Map<String, BaseBeamTable> getSourceTables() {
+    return sourceTables;
+  }
+
+  public Planner getPlanner() {
+    return planner;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java
new file mode 100644
index 0000000..5734653
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.planner;
+
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
+
+/**
+ * customized data type in Beam.
+ *
+ */
+public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl {
+  public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new BeamRelDataTypeSystem();
+
+  @Override
+  public int getMaxNumericScale() {
+    return 38;
+  }
+
+  @Override
+  public int getMaxNumericPrecision() {
+    return 38;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
new file mode 100644
index 0000000..d3c9871
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.planner;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.util.Iterator;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregationRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamFilterRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSinkRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSourceRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIntersectRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamJoinRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamMinusRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamProjectRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamSortRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUnionRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamValuesRule;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.tools.RuleSet;
+
+/**
+ * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard
+ * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode}
+ *
+ */
+public class BeamRuleSets {
+  private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet
+      .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE,
+          BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
+          BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE,
+          BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE,
+          BeamJoinRule.INSTANCE)
+      .build();
+
+  public static RuleSet[] getRuleSets() {
+    return new RuleSet[] { new BeamRuleSet(
+        ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build()) };
+  }
+
+  private static class BeamRuleSet implements RuleSet {
+    final ImmutableSet<RelOptRule> rules;
+
+    public BeamRuleSet(ImmutableSet<RelOptRule> rules) {
+      this.rules = rules;
+    }
+
+    public BeamRuleSet(ImmutableList<RelOptRule> rules) {
+      this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build();
+    }
+
+    @Override
+    public Iterator<RelOptRule> iterator() {
+      return rules.iterator();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/package-info.java
new file mode 100644
index 0000000..a5552b3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner} is the main interface.
+ * It defines data sources, validate a SQL statement, and convert it as a Beam
+ * pipeline.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.planner;

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
new file mode 100644
index 0000000..8e78684
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.joda.time.Duration;
+
+/**
+ * {@link BeamRelNode} to replace a {@link Aggregate} node.
+ *
+ */
+public class BeamAggregationRel extends Aggregate implements BeamRelNode {
+  private int windowFieldIdx = -1;
+  private WindowFn<BeamSqlRow, BoundedWindow> windowFn;
+  private Trigger trigger;
+  private Duration allowedLatence = Duration.ZERO;
+
+  public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits
+      , RelNode child, boolean indicator,
+      ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls
+      , WindowFn windowFn, Trigger trigger, int windowFieldIdx, Duration allowedLatence) {
+    super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
+    this.windowFn = windowFn;
+    this.trigger = trigger;
+    this.windowFieldIdx = windowFieldIdx;
+    this.allowedLatence = allowedLatence;
+  }
+
+  @Override
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    RelNode input = getInput();
+    String stageName = BeamSqlRelUtils.getStageName(this) + "_";
+
+    PCollection<BeamSqlRow> upstream =
+        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
+    if (windowFieldIdx != -1) {
+      upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps
+          .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
+          .setCoder(upstream.getCoder());
+    }
+
+    PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window",
+        Window.into(windowFn)
+        .triggering(trigger)
+        .withAllowedLateness(allowedLatence)
+        .accumulatingFiredPanes());
+
+    BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply(
+        stageName + "exCombineBy",
+        WithKeys
+            .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
+                windowFieldIdx, groupSet)))
+        .setCoder(KvCoder.of(keyCoder, upstream.getCoder()));
+
+
+    BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
+
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply(
+        stageName + "combineBy",
+        Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
+            new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(),
+                CalciteUtils.toBeamRowType(input.getRowType()))))
+        .setCoder(KvCoder.of(keyCoder, aggCoder));
+
+    PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
+        ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
+            CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx)));
+    mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+
+    return mergedStream;
+  }
+
+  /**
+   * Type of sub-rowrecord used as Group-By keys.
+   */
+  private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) {
+    BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType);
+    List<String> fieldNames = new ArrayList<>();
+    List<Integer> fieldTypes = new ArrayList<>();
+    for (int i : groupSet.asList()) {
+      if (i != windowFieldIdx) {
+        fieldNames.add(inputRowType.getFieldsName().get(i));
+        fieldTypes.add(inputRowType.getFieldsType().get(i));
+      }
+    }
+    return BeamSqlRowType.create(fieldNames, fieldTypes);
+  }
+
+  /**
+   * Type of sub-rowrecord, that represents the list of aggregation fields.
+   */
+  private BeamSqlRowType exAggFieldsSchema() {
+    List<String> fieldNames = new ArrayList<>();
+    List<Integer> fieldTypes = new ArrayList<>();
+    for (AggregateCall ac : getAggCallList()) {
+      fieldNames.add(ac.name);
+      fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
+    }
+
+    return BeamSqlRowType.create(fieldNames, fieldTypes);
+  }
+
+  @Override
+  public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator
+      , ImmutableBitSet groupSet,
+      List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+    return new BeamAggregationRel(getCluster(), traitSet, input, indicator
+        , groupSet, groupSets, aggCalls, windowFn, trigger, windowFieldIdx, allowedLatence);
+  }
+
+  public void setWindowFn(WindowFn windowFn) {
+    this.windowFn = windowFn;
+  }
+
+  public void setTrigger(Trigger trigger) {
+    this.trigger = trigger;
+  }
+
+  public RelWriter explainTerms(RelWriter pw) {
+    // We skip the "groups" element if it is a singleton of "group".
+    pw.item("group", groupSet)
+        .itemIf("window", windowFn, windowFn != null)
+        .itemIf("trigger", trigger, trigger != null)
+        .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1)
+        .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE)
+        .itemIf("indicator", indicator, indicator)
+        .itemIf("aggs", aggCalls, pw.nest());
+    if (!pw.nest()) {
+      for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) {
+        pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e);
+      }
+    }
+    return pw;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
new file mode 100644
index 0000000..b453db4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlFilterFn;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * BeamRelNode to replace a {@code Filter} node.
+ *
+ */
+public class BeamFilterRel extends Filter implements BeamRelNode {
+
+  public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
+      RexNode condition) {
+    super(cluster, traits, child, condition);
+  }
+
+  @Override
+  public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+    return new BeamFilterRel(getCluster(), traitSet, input, condition);
+  }
+
+  @Override
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    RelNode input = getInput();
+    String stageName = BeamSqlRelUtils.getStageName(this);
+
+    PCollection<BeamSqlRow> upstream =
+        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
+
+    BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
+
+    PCollection<BeamSqlRow> filterStream = upstream.apply(stageName,
+        ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
+    filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+
+    return filterStream;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
new file mode 100644
index 0000000..d5eb210
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import com.google.common.base.Joiner;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * BeamRelNode to replace a {@code TableModify} node.
+ *
+ */
+public class BeamIOSinkRel extends TableModify implements BeamRelNode {
+  public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table,
+      Prepare.CatalogReader catalogReader, RelNode child, Operation operation,
+      List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+    super(cluster, traits, table, catalogReader, child, operation, updateColumnList,
+        sourceExpressionList, flattened);
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs),
+        getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
+  }
+
+  /**
+   * Note that {@code BeamIOSinkRel} returns the input PCollection,
+   * which is the persisted PCollection.
+   */
+  @Override
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    RelNode input = getInput();
+    String stageName = BeamSqlRelUtils.getStageName(this);
+
+    PCollection<BeamSqlRow> upstream =
+        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
+
+    String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
+
+    BaseBeamTable targetTable = sqlEnv.findTable(sourceName);
+
+    upstream.apply(stageName, targetTable.buildIOWriter());
+
+    return upstream;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
new file mode 100644
index 0000000..5179eba
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import com.google.common.base.Joiner;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.TableScan;
+
+/**
+ * BeamRelNode to replace a {@code TableScan} node.
+ *
+ */
+public class BeamIOSourceRel extends TableScan implements BeamRelNode {
+
+  public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
+    super(cluster, traitSet, table);
+  }
+
+  @Override
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
+
+    TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName);
+    if (inputPCollections.has(sourceTupleTag)) {
+      //choose PCollection from input PCollectionTuple if exists there.
+      PCollection<BeamSqlRow> sourceStream = inputPCollections
+          .get(new TupleTag<BeamSqlRow>(sourceName));
+      return sourceStream;
+    } else {
+      //If not, the source PColection is provided with BaseBeamTable.buildIOReader().
+      BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
+      return sourceTable.buildIOReader(inputPCollections.getPipeline())
+          .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
new file mode 100644
index 0000000..d6ab52d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.SetOp;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Intersect} node.
+ *
+ * <p>This is used to combine two SELECT statements, but returns rows only from the
+ * first SELECT statement that are identical to a row in the second SELECT statement.
+ */
+public class BeamIntersectRel extends Intersect implements BeamRelNode {
+  private BeamSetOperatorRelBase delegate;
+  public BeamIntersectRel(
+      RelOptCluster cluster,
+      RelTraitSet traits,
+      List<RelNode> inputs,
+      boolean all) {
+    super(cluster, traits, inputs, all);
+    delegate = new BeamSetOperatorRelBase(this,
+        BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all);
+  }
+
+  @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+    return new BeamIntersectRel(getCluster(), traitSet, inputs, all);
+  }
+
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
new file mode 100644
index 0000000..2de2a89
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Join} node.
+ *
+ * <p>Support for join can be categorized into 3 cases:
+ * <ul>
+ *   <li>BoundedTable JOIN BoundedTable</li>
+ *   <li>UnboundedTable JOIN UnboundedTable</li>
+ *   <li>BoundedTable JOIN UnboundedTable</li>
+ * </ul>
+ *
+ * <p>For the first two cases, a standard join is utilized as long as the windowFn of the both
+ * sides match.
+ *
+ * <p>For the third case, {@code sideInput} is utilized to implement the join, so there are some
+ * constraints:
+ *
+ * <ul>
+ *   <li>{@code FULL OUTER JOIN} is not supported.</li>
+ *   <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side.</li>
+ *   <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side.</li>
+ * </ul>
+ *
+ *
+ * <p>There are also some general constraints:
+ *
+ * <ul>
+ *  <li>Only equi-join is supported.</li>
+ *  <li>CROSS JOIN is not supported.</li>
+ * </ul>
+ */
+public class BeamJoinRel extends Join implements BeamRelNode {
+  public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+      RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+    super(cluster, traits, left, right, condition, variablesSet, joinType);
+  }
+
+  @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left,
+      RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+    return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet,
+        joinType);
+  }
+
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections,
+      BeamSqlEnv sqlEnv)
+      throws Exception {
+    BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+    BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
+    PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+
+    final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+    PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+
+    String stageName = BeamSqlRelUtils.getStageName(this);
+    WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
+    WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
+
+    // extract the join fields
+    List<Pair<Integer, Integer>> pairs = extractJoinColumns(
+        leftRelNode.getRowType().getFieldCount());
+
+    // build the extract key type
+    // the name of the join field is not important
+    List<String> names = new ArrayList<>(pairs.size());
+    List<Integer> types = new ArrayList<>(pairs.size());
+    for (int i = 0; i < pairs.size(); i++) {
+      names.add("c" + i);
+      types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
+    }
+    BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types);
+
+    Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType);
+
+    // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows
+        .apply(stageName + "_left_ExtractJoinFields",
+            MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs)))
+        .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
+
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows
+        .apply(stageName + "_right_ExtractJoinFields",
+            MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs)))
+        .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
+
+    // prepare the NullRows
+    BeamSqlRow leftNullRow = buildNullRow(leftRelNode);
+    BeamSqlRow rightNullRow = buildNullRow(rightRelNode);
+
+    // a regular join
+    if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
+            && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
+           || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
+                && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) {
+      try {
+        leftWinFn.verifyCompatibility(rightWinFn);
+      } catch (IncompatibleWindowException e) {
+        throw new IllegalArgumentException(
+            "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e);
+      }
+
+      return standardJoin(extractedLeftRows, extractedRightRows,
+          leftNullRow, rightNullRow, stageName);
+    } else if (
+        (leftRows.isBounded() == PCollection.IsBounded.BOUNDED
+        && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)
+        || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
+            && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
+        ) {
+      // if one of the sides is Bounded & the other is Unbounded
+      // then do a sideInput join
+      // when doing a sideInput join, the windowFn does not need to match
+      // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be
+      // the unbounded
+      if (joinType == JoinRelType.FULL) {
+        throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join "
+            + "a bounded table with an unbounded table.");
+      }
+
+      if ((joinType == JoinRelType.LEFT
+          && leftRows.isBounded() == PCollection.IsBounded.BOUNDED)
+          || (joinType == JoinRelType.RIGHT
+          && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
+        throw new UnsupportedOperationException(
+            "LEFT side of an OUTER JOIN must be Unbounded table.");
+      }
+
+      return sideInputJoin(extractedLeftRows, extractedRightRows,
+          leftNullRow, rightNullRow);
+    } else {
+      throw new UnsupportedOperationException(
+          "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn);
+    }
+  }
+
+  private PCollection<BeamSqlRow> standardJoin(
+      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
+      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
+      BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) {
+    PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null;
+    switch (joinType) {
+      case LEFT:
+        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+            .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow);
+        break;
+      case RIGHT:
+        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+            .rightOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow);
+        break;
+      case FULL:
+        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+            .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow,
+            rightNullRow);
+        break;
+      case INNER:
+      default:
+        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+            .innerJoin(extractedLeftRows, extractedRightRows);
+        break;
+    }
+
+    PCollection<BeamSqlRow> ret = joinedRows
+        .apply(stageName + "_JoinParts2WholeRow",
+            MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
+        .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+    return ret;
+  }
+
+  public PCollection<BeamSqlRow> sideInputJoin(
+      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
+      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
+      BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) {
+    // we always make the Unbounded table on the left to do the sideInput join
+    // (will convert the result accordingly before return)
+    boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED);
+    JoinRelType realJoinType =
+        (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType;
+
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows =
+        swapped ? extractedRightRows : extractedLeftRows;
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows =
+        swapped ? extractedLeftRows : extractedRightRows;
+    BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow;
+
+    // swapped still need to pass down because, we need to swap the result back.
+    return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows,
+        realRightNullRow, swapped);
+  }
+
+  private PCollection<BeamSqlRow> sideInputJoinHelper(
+      JoinRelType joinType,
+      PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows,
+      PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows,
+      BeamSqlRow rightNullRow, boolean swapped) {
+    final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows
+        .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap());
+
+    PCollection<BeamSqlRow> ret = leftRows
+        .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
+            joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView))
+        .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+
+    return ret;
+  }
+
+  private BeamSqlRow buildNullRow(BeamRelNode relNode) {
+    BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
+    BeamSqlRow nullRow = new BeamSqlRow(leftType);
+    for (int i = 0; i < leftType.size(); i++) {
+      nullRow.addField(i, null);
+    }
+    return nullRow;
+  }
+
+  private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
+    // it's a CROSS JOIN because: condition == true
+    if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) {
+      throw new UnsupportedOperationException("CROSS JOIN is not supported!");
+    }
+
+    RexCall call = (RexCall) condition;
+    List<Pair<Integer, Integer>> pairs = new ArrayList<>();
+    if ("AND".equals(call.getOperator().getName())) {
+      List<RexNode> operands = call.getOperands();
+      for (RexNode rexNode : operands) {
+        Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount);
+        pairs.add(pair);
+      }
+    } else if ("=".equals(call.getOperator().getName())) {
+      pairs.add(extractOneJoinColumn(call, leftRowColumnCount));
+    } else {
+      throw new UnsupportedOperationException(
+          "Operator " + call.getOperator().getName() + " is not supported in join condition");
+    }
+
+    return pairs;
+  }
+
+  private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition,
+      int leftRowColumnCount) {
+    List<RexNode> operands = oneCondition.getOperands();
+    final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(),
+        ((RexInputRef) operands.get(1)).getIndex());
+
+    final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(),
+        ((RexInputRef) operands.get(1)).getIndex());
+    final int rightIndex = rightIndex1 - leftRowColumnCount;
+
+    return new Pair<>(leftIndex, rightIndex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamLogicalConvention.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamLogicalConvention.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamLogicalConvention.java
new file mode 100644
index 0000000..11e4f5e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamLogicalConvention.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+
+/**
+ * Convertion for Beam SQL.
+ *
+ */
+public enum BeamLogicalConvention implements Convention {
+  INSTANCE;
+
+  @Override
+  public Class getInterface() {
+    return BeamRelNode.class;
+  }
+
+  @Override
+  public String getName() {
+    return "BEAM_LOGICAL";
+  }
+
+  @Override
+  public RelTraitDef getTraitDef() {
+    return ConventionTraitDef.INSTANCE;
+  }
+
+  @Override
+  public boolean satisfies(RelTrait trait) {
+    return this == trait;
+  }
+
+  @Override
+  public void register(RelOptPlanner planner) {
+  }
+
+  @Override
+  public String toString() {
+    return getName();
+  }
+
+  @Override
+  public boolean canConvertConvention(Convention toConvention) {
+    return false;
+  }
+
+  @Override
+  public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
new file mode 100644
index 0000000..0075d3a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.core.SetOp;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Minus} node.
+ *
+ * <p>Corresponds to the SQL {@code EXCEPT} operator.
+ */
+public class BeamMinusRel extends Minus implements BeamRelNode {
+
+  private BeamSetOperatorRelBase delegate;
+
+  public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+      boolean all) {
+    super(cluster, traits, inputs, all);
+    delegate = new BeamSetOperatorRelBase(this,
+        BeamSetOperatorRelBase.OpType.MINUS, inputs, all);
+  }
+
+  @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+    return new BeamMinusRel(getCluster(), traitSet, inputs, all);
+  }
+
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
new file mode 100644
index 0000000..6ccb156
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlProjectFn;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * BeamRelNode to replace a {@code Project} node.
+ *
+ */
+public class BeamProjectRel extends Project implements BeamRelNode {
+
+  /**
+   * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}.
+   *
+   */
+  public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+      List<? extends RexNode> projects, RelDataType rowType) {
+    super(cluster, traits, input, projects, rowType);
+  }
+
+  @Override
+  public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects,
+      RelDataType rowType) {
+    return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType);
+  }
+
+  @Override
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    RelNode input = getInput();
+    String stageName = BeamSqlRelUtils.getStageName(this);
+
+    PCollection<BeamSqlRow> upstream =
+        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
+
+    BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
+
+    PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo
+        .of(new BeamSqlProjectFn(getRelTypeName(), executor,
+            CalciteUtils.toBeamRowType(rowType))));
+    projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+
+    return projectStream;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
new file mode 100644
index 0000000..8a51cc7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.rel.RelNode;
+
+/**
+ * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added.
+ */
+public interface BeamRelNode extends RelNode {
+
+  /**
+   * A {@link BeamRelNode} is a recursive structure, the
+   * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
+   * algorithm.
+   */
+  PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
+      throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
new file mode 100644
index 0000000..44e4338
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.rel.RelNode;
+
+/**
+ * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel}
+ * and {@code BeamMinusRel}.
+ */
+public class BeamSetOperatorRelBase {
+  /**
+   * Set operator type.
+   */
+  public enum OpType implements Serializable {
+    UNION,
+    INTERSECT,
+    MINUS
+  }
+
+  private BeamRelNode beamRelNode;
+  private List<RelNode> inputs;
+  private boolean all;
+  private OpType opType;
+
+  public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType,
+      List<RelNode> inputs, boolean all) {
+    this.beamRelNode = beamRelNode;
+    this.opType = opType;
+    this.inputs = inputs;
+    this.all = all;
+  }
+
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
+        .buildBeamPipeline(inputPCollections, sqlEnv);
+    PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
+        .buildBeamPipeline(inputPCollections, sqlEnv);
+
+    WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
+    WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
+    if (!leftWindow.isCompatible(rightWindow)) {
+      throw new IllegalArgumentException(
+          "inputs of " + opType + " have different window strategy: "
+          + leftWindow + " VS " + rightWindow);
+    }
+
+    final TupleTag<BeamSqlRow> leftTag = new TupleTag<>();
+    final TupleTag<BeamSqlRow> rightTag = new TupleTag<>();
+
+    // co-group
+    String stageName = BeamSqlRelUtils.getStageName(beamRelNode);
+    PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple
+        .of(leftTag, leftRows.apply(
+            stageName + "_CreateLeftIndex", MapElements.via(
+                new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
+        .and(rightTag, rightRows.apply(
+            stageName + "_CreateRightIndex", MapElements.via(
+                new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
+        .apply(CoGroupByKey.<BeamSqlRow>create());
+    PCollection<BeamSqlRow> ret = coGbkResultCollection
+        .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag,
+            opType, all)));
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
new file mode 100644
index 0000000..4ea12ca
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.lang.reflect.Type;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Sort} node.
+ *
+ * <p>Since Beam does not fully supported global sort we are using {@link Top} to implement
+ * the {@code Sort} algebra. The following types of ORDER BY are supported:
+
+ * <pre>{@code
+ *     select * from t order by id desc limit 10;
+ *     select * from t order by id desc limit 10, 5;
+ * }</pre>
+ *
+ * <p>but Order BY without a limit is NOT supported:
+ *
+ * <pre>{@code
+ *   select * from t order by id desc
+ * }</pre>
+ *
+ * <h3>Constraints</h3>
+ * <ul>
+ *   <li>Due to the constraints of {@link Top}, the result of a `ORDER BY LIMIT`
+ *   must fit into the memory of a single machine.</li>
+ *   <li>Since `WINDOW`(HOP, TUMBLE, SESSION etc) is always associated with `GroupBy`,
+ *   it does not make much sense to use `ORDER BY` with `WINDOW`.
+ *   </li>
+ * </ul>
+ */
+public class BeamSortRel extends Sort implements BeamRelNode {
+  private List<Integer> fieldIndices = new ArrayList<>();
+  private List<Boolean> orientation = new ArrayList<>();
+  private List<Boolean> nullsFirst = new ArrayList<>();
+
+  private int startIndex = 0;
+  private int count;
+
+  public BeamSortRel(
+      RelOptCluster cluster,
+      RelTraitSet traits,
+      RelNode child,
+      RelCollation collation,
+      RexNode offset,
+      RexNode fetch) {
+    super(cluster, traits, child, collation, offset, fetch);
+
+    List<RexNode> fieldExps = getChildExps();
+    RelCollationImpl collationImpl = (RelCollationImpl) collation;
+    List<RelFieldCollation> collations = collationImpl.getFieldCollations();
+    for (int i = 0; i < fieldExps.size(); i++) {
+      RexNode fieldExp = fieldExps.get(i);
+      RexInputRef inputRef = (RexInputRef) fieldExp;
+      fieldIndices.add(inputRef.getIndex());
+      orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING);
+
+      RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection;
+      if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) {
+        rawNullDirection = collations.get(i).getDirection().defaultNullDirection();
+      }
+      nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST);
+    }
+
+    if (fetch == null) {
+      throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!");
+    }
+
+    RexLiteral fetchLiteral = (RexLiteral) fetch;
+    count = ((BigDecimal) fetchLiteral.getValue()).intValue();
+
+    if (offset != null) {
+      RexLiteral offsetLiteral = (RexLiteral) offset;
+      startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue();
+    }
+  }
+
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    RelNode input = getInput();
+    PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input)
+        .buildBeamPipeline(inputPCollections, sqlEnv);
+    Type windowType = upstream.getWindowingStrategy().getWindowFn()
+        .getWindowTypeDescriptor().getType();
+    if (!windowType.equals(GlobalWindow.class)) {
+      throw new UnsupportedOperationException(
+          "`ORDER BY` is only supported for GlobalWindow, actual window: " + windowType);
+    }
+
+    BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation,
+        nullsFirst);
+    // first find the top (offset + count)
+    PCollection<List<BeamSqlRow>> rawStream =
+        upstream.apply("extractTopOffsetAndFetch",
+            Top.of(startIndex + count, comparator).withoutDefaults())
+        .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
+
+    // strip the `leading offset`
+    if (startIndex > 0) {
+      rawStream = rawStream.apply("stripLeadingOffset", ParDo.of(
+          new SubListFn<BeamSqlRow>(startIndex, startIndex + count)))
+          .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
+    }
+
+    PCollection<BeamSqlRow> orderedStream = rawStream.apply(
+        "flatten", Flatten.<BeamSqlRow>iterables());
+    orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+
+    return orderedStream;
+  }
+
+  private static class SubListFn<T> extends DoFn<List<T>, List<T>> {
+    private int startIndex;
+    private int endIndex;
+
+    public SubListFn(int startIndex, int endIndex) {
+      this.startIndex = startIndex;
+      this.endIndex = endIndex;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext ctx) {
+      ctx.output(ctx.element().subList(startIndex, endIndex));
+    }
+  }
+
+  @Override public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation,
+      RexNode offset, RexNode fetch) {
+    return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+  }
+
+  private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, Serializable {
+    private List<Integer> fieldsIndices;
+    private List<Boolean> orientation;
+    private List<Boolean> nullsFirst;
+
+    public BeamSqlRowComparator(List<Integer> fieldsIndices,
+        List<Boolean> orientation,
+        List<Boolean> nullsFirst) {
+      this.fieldsIndices = fieldsIndices;
+      this.orientation = orientation;
+      this.nullsFirst = nullsFirst;
+    }
+
+    @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) {
+      for (int i = 0; i < fieldsIndices.size(); i++) {
+        int fieldIndex = fieldsIndices.get(i);
+        int fieldRet = 0;
+        SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex);
+        // whether NULL should be ordered first or last(compared to non-null values) depends on
+        // what user specified in SQL(NULLS FIRST/NULLS LAST)
+        if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
+          continue;
+        } else if (row1.isNull(fieldIndex) && !row2.isNull(fieldIndex)) {
+          fieldRet = -1 * (nullsFirst.get(i) ? -1 : 1);
+        } else if (!row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
+          fieldRet = 1 * (nullsFirst.get(i) ? -1 : 1);
+        } else {
+          switch (fieldType) {
+            case TINYINT:
+              fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex));
+              break;
+            case SMALLINT:
+              fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex));
+              break;
+            case INTEGER:
+              fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex));
+              break;
+            case BIGINT:
+              fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex));
+              break;
+            case FLOAT:
+              fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex));
+              break;
+            case DOUBLE:
+              fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex));
+              break;
+            case VARCHAR:
+              fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex));
+              break;
+            case DATE:
+              fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex));
+              break;
+            default:
+              throw new UnsupportedOperationException(
+                  "Data type: " + fieldType + " not supported yet!");
+          }
+        }
+
+        fieldRet *= (orientation.get(i) ? -1 : 1);
+        if (fieldRet != 0) {
+          return fieldRet;
+        }
+      }
+      return 0;
+    }
+  }
+
+  public static <T extends Number & Comparable> int numberCompare(T a, T b) {
+    return a.compareTo(b);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
new file mode 100644
index 0000000..6467d9f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities for {@code BeamRelNode}.
+ */
+class BeamSqlRelUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRelUtils.class);
+
+  private static final AtomicInteger sequence = new AtomicInteger(0);
+  private static final AtomicInteger classSequence = new AtomicInteger(0);
+
+  public static String getStageName(BeamRelNode relNode) {
+    return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
+        + sequence.getAndIncrement();
+  }
+
+  public static String getClassName(BeamRelNode relNode) {
+    return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
+        + "_" + classSequence.getAndIncrement();
+  }
+
+  public static BeamRelNode getBeamRelInput(RelNode input) {
+    if (input instanceof RelSubset) {
+      // go with known best input
+      input = ((RelSubset) input).getBest();
+    }
+    return (BeamRelNode) input;
+  }
+
+  public static String explain(final RelNode rel) {
+    return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+  }
+
+  public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
+    String explain = "";
+    try {
+      explain = RelOptUtil.toString(rel);
+    } catch (StackOverflowError e) {
+      LOG.error("StackOverflowError occurred while extracting plan. "
+          + "Please report it to the dev@ mailing list.");
+      LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
+      LOG.error("Forcing plan to empty string and continue... "
+          + "SQL Runner may not working properly after.");
+    }
+    return explain;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
new file mode 100644
index 0000000..d35fa67
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Union;
+
+/**
+ * {@link BeamRelNode} to replace a {@link Union}.
+ *
+ * <p>{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL
+ * perspective, two cases are supported:
+ *
+ * <p>1) Do not use {@code grouped window function}:
+ *
+ * <pre>{@code
+ *   select * from person UNION select * from person
+ * }</pre>
+ *
+ * <p>2) Use the same {@code grouped window function}, with the same param:
+ * <pre>{@code
+ *   select id, count(*) from person
+ *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ *   UNION
+ *   select * from person
+ *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ * }</pre>
+ *
+ * <p>Inputs with different group functions are NOT supported:
+ * <pre>{@code
+ *   select id, count(*) from person
+ *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ *   UNION
+ *   select * from person
+ *   group by id, TUMBLE(order_time, INTERVAL '2' HOUR)
+ * }</pre>
+ */
+public class BeamUnionRel extends Union implements BeamRelNode {
+  private BeamSetOperatorRelBase delegate;
+  public BeamUnionRel(RelOptCluster cluster,
+      RelTraitSet traits,
+      List<RelNode> inputs,
+      boolean all) {
+    super(cluster, traits, inputs, all);
+    this.delegate = new BeamSetOperatorRelBase(this,
+        BeamSetOperatorRelBase.OpType.UNION,
+        inputs, all);
+  }
+
+  public BeamUnionRel(RelInput input) {
+    super(input);
+  }
+
+  @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+    return new BeamUnionRel(getCluster(), traitSet, inputs, all);
+  }
+
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7eb113b3/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
new file mode 100644
index 0000000..f12cbbc
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Values} node.
+ *
+ * <p>{@code BeamValuesRel} will be used in the following SQLs:
+ * <ul>
+ *   <li>{@code insert into t (name, desc) values ('hello', 'world')}</li>
+ *   <li>{@code select 1, '1', LOCALTIME}</li>
+ * </ul>
+ */
+public class BeamValuesRel extends Values implements BeamRelNode {
+
+  public BeamValuesRel(
+      RelOptCluster cluster,
+      RelDataType rowType,
+      ImmutableList<ImmutableList<RexLiteral>> tuples,
+      RelTraitSet traits) {
+    super(cluster, rowType, tuples, traits);
+
+  }
+
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    List<BeamSqlRow> rows = new ArrayList<>(tuples.size());
+    String stageName = BeamSqlRelUtils.getStageName(this);
+    if (tuples.isEmpty()) {
+      throw new IllegalStateException("Values with empty tuples!");
+    }
+
+    BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
+    for (ImmutableList<RexLiteral> tuple : tuples) {
+      BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
+      for (int i = 0; i < tuple.size(); i++) {
+        BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue());
+      }
+      rows.add(row);
+    }
+
+    return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
+        .setCoder(new BeamSqlRowCoder(beamSQLRowType));
+  }
+}


Mime
View raw message