beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-3181) [Nexmark][SQL] Implement a basic pass-through SQL query
Date Wed, 03 Jan 2018 18:07:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-3181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16310008#comment-16310008 ] 

ASF GitHub Bot commented on BEAM-3181:
--------------------------------------

kennknowles closed pull request #4128: [BEAM-3181][Nexmark][SQL] Implement query0
URL: https://github.com/apache/beam/pull/4128
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/nexmark/build.gradle b/sdks/java/nexmark/build.gradle
index 327dbecb1d5..6b76fa91823 100644
--- a/sdks/java/nexmark/build.gradle
+++ b/sdks/java/nexmark/build.gradle
@@ -26,6 +26,7 @@ dependencies {
   shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-core", configuration: "shadow")
   shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-io-parent:beam-sdks-java-io-google-cloud-platform", configuration: "shadow")
   shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-extensions-parent:beam-sdks-java-extensions-google-cloud-platform-core", configuration: "shadow")
+  shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-extensions-parent:beam-sdks-java-extensions-sql", configuration: "shadow")
   shadow library.java.google_api_services_bigquery
   shadow library.java.jackson_core
   shadow library.java.jackson_annotations
@@ -36,6 +37,7 @@ dependencies {
   shadow library.java.findbugs_jsr305
   shadow library.java.junit
   shadow library.java.hamcrest_core
+  shadow library.java.commons_lang3
   shadow project(path: ":beam-runners-parent:beam-runners-direct-java", configuration: "shadow")
   shadow library.java.slf4j_jdk14
   testCompile library.java.hamcrest_core
@@ -46,4 +48,8 @@ task packageTests(type: Jar) {
   classifier = "tests"
 }
 
+test {
+  jvmArgs "-da"
+}
+
 artifacts.archives packageTests
diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml
index 15f51049ec9..b10fa69c3ab 100644
--- a/sdks/java/nexmark/pom.xml
+++ b/sdks/java/nexmark/pom.xml
@@ -163,6 +163,14 @@
           </execution>
         </executions>
       </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <argLine>-da</argLine> <!-- disable assert in Calcite converter validation -->
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 
@@ -170,6 +178,7 @@
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
+      <scope>compile</scope>
     </dependency>
 
     <dependency>
@@ -261,5 +270,19 @@
       <artifactId>auto-value</artifactId>
       <scope>provided</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-extensions-sql</artifactId>
+      <scope>compile</scope>
+      <version>${parent.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${apache.commons.lang.version}</version>
+      <scope>runtime</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index 550fbd2ce84..a4b5c5aaf5c 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -25,6 +25,7 @@
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,7 +34,9 @@
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ThreadLocalRandom;
+
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.CoderException;
@@ -77,6 +80,8 @@
 import org.apache.beam.sdk.nexmark.queries.Query8Model;
 import org.apache.beam.sdk.nexmark.queries.Query9;
 import org.apache.beam.sdk.nexmark.queries.Query9Model;
+import org.apache.beam.sdk.nexmark.queries.sql.NexmarkSqlQuery;
+import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery0;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -86,6 +91,7 @@
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+
 import org.joda.time.Duration;
 import org.slf4j.LoggerFactory;
 
@@ -93,7 +99,14 @@
  * Run a single Nexmark query using a given configuration.
  */
 public class NexmarkLauncher<OptionT extends NexmarkOptions> {
+
   private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NexmarkLauncher.class);
+
+  /**
+   * Command line parameter value for query language.
+   */
+  private static final String SQL = "sql";
+
   /**
    * Minimum number of samples needed for 'stead-state' rate calculation.
    */
@@ -1056,37 +1069,10 @@ public NexmarkPerf run(NexmarkConfiguration runConfiguration) {
         return null;
       }
 
-      List<NexmarkQuery> queries = Arrays.asList(new Query0(configuration),
-                                                 new Query1(configuration),
-                                                 new Query2(configuration),
-                                                 new Query3(configuration),
-                                                 new Query4(configuration),
-                                                 new Query5(configuration),
-                                                 new Query6(configuration),
-                                                 new Query7(configuration),
-                                                 new Query8(configuration),
-                                                 new Query9(configuration),
-                                                 new Query10(configuration),
-                                                 new Query11(configuration),
-                                                 new Query12(configuration));
-      NexmarkQuery query = queries.get(configuration.query);
+      NexmarkQuery query = getNexmarkQuery();
       queryName = query.getName();
 
-      List<NexmarkQueryModel> models = Arrays.asList(
-          new Query0Model(configuration),
-          new Query1Model(configuration),
-          new Query2Model(configuration),
-          new Query3Model(configuration),
-          new Query4Model(configuration),
-          new Query5Model(configuration),
-          new Query6Model(configuration),
-          new Query7Model(configuration),
-          new Query8Model(configuration),
-          new Query9Model(configuration),
-          null,
-          null,
-          null);
-      NexmarkQueryModel model = models.get(configuration.query);
+      NexmarkQueryModel model = getNexmarkQueryModel();
 
       if (options.getJustModelResultRate()) {
         if (model == null) {
@@ -1154,4 +1140,81 @@ public NexmarkPerf run(NexmarkConfiguration runConfiguration) {
       queryName = null;
     }
   }
+
+  private boolean isSql() {
+    return SQL.equalsIgnoreCase(options.getQueryLanguage());
+  }
+
+  private NexmarkQueryModel getNexmarkQueryModel() {
+    List<NexmarkQueryModel> models = createQueryModels();
+    return models.get(configuration.query);
+  }
+
+  private NexmarkQuery getNexmarkQuery() {
+    List<NexmarkQuery> queries = createQueries();
+
+    if (options.getQuery() >= queries.size()) {
+      throw new UnsupportedOperationException(
+          "Query " + options.getQuery()
+          + " is not implemented yet");
+    }
+
+    return queries.get(configuration.query);
+  }
+
+  private List<NexmarkQueryModel> createQueryModels() {
+    return isSql()
+        ? createSqlQueryModels()
+        : createJavaQueryModels();
+  }
+
+  private List<NexmarkQueryModel> createSqlQueryModels() {
+    return Arrays.asList(
+        null, null, null, null, null, null, null, null, null, null, null, null);
+  }
+
+  private List<NexmarkQueryModel> createJavaQueryModels() {
+    return Arrays.asList(
+            new Query0Model(configuration),
+            new Query1Model(configuration),
+            new Query2Model(configuration),
+            new Query3Model(configuration),
+            new Query4Model(configuration),
+            new Query5Model(configuration),
+            new Query6Model(configuration),
+            new Query7Model(configuration),
+            new Query8Model(configuration),
+            new Query9Model(configuration),
+            null,
+            null,
+            null);
+  }
+
+  private List<NexmarkQuery> createQueries() {
+    return isSql()
+        ? createSqlQueries()
+        : createJavaQueries();
+  }
+
+  private List<NexmarkQuery> createSqlQueries() {
+    return Arrays.<NexmarkQuery> asList(
+        new NexmarkSqlQuery(configuration, new SqlQuery0()));
+  }
+
+  private List<NexmarkQuery> createJavaQueries() {
+    return Arrays.asList(
+        new Query0(configuration),
+        new Query1(configuration),
+        new Query2(configuration),
+        new Query3(configuration),
+        new Query4(configuration),
+        new Query5(configuration),
+        new Query6(configuration),
+        new Query7(configuration),
+        new Query8(configuration),
+        new Query9(configuration),
+        new Query10(configuration),
+        new Query11(configuration),
+        new Query12(configuration));
+  }
 }
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
index 2a2a5a782a6..b9c8861b680 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.nexmark;
 
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
@@ -400,4 +401,10 @@
   Long getWatermarkValidationDelaySeconds();
 
   void setWatermarkValidationDelaySeconds(Long value);
+
+  @Description("Specify 'sql' to use Beam SQL queries. Otherwise Java transforms will be used")
+  @Nullable
+  String getQueryLanguage();
+
+  void setQueryLanguage(String value);
 }
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
index 6a37ade0162..32d7e22ff6c 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
@@ -80,14 +80,14 @@ public Auction decode(
 
   /** Extra auction properties. */
   @JsonProperty
-  private final String itemName;
+  public final String itemName;
 
   @JsonProperty
-  private final String description;
+  public final String description;
 
   /** Initial bid price, in cents. */
   @JsonProperty
-  private final long initialBid;
+  public final long initialBid;
 
   /** Reserve price, in cents. */
   @JsonProperty
@@ -110,7 +110,7 @@ public Auction decode(
 
   /** Additional arbitrary payload for performance testing. */
   @JsonProperty
-  private final String extra;
+  public final String extra;
 
 
   // For Avro only.
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
index 800f937eade..a0c78eb0af5 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
@@ -75,10 +75,10 @@ public Person decode(InputStream inStream)
   public final String name;
 
   @JsonProperty
-  private final String emailAddress;
+  public final String emailAddress;
 
   @JsonProperty
-  private final String creditCard;
+  public final String creditCard;
 
   @JsonProperty
   public final String city;
@@ -91,7 +91,7 @@ public Person decode(InputStream inStream)
 
   /** Additional arbitrary payload for performance testing. */
   @JsonProperty
-  private final String extra;
+  public final String extra;
 
   // For Avro only.
   @SuppressWarnings("unused")
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/BeamRecordSize.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/BeamRecordSize.java
new file mode 100644
index 00000000000..e0a5f3c4c96
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/BeamRecordSize.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.model.sql;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.Types;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
+
+/**
+ * {@link KnownSize} implementation to estimate the size of a {@link BeamRecord},
+ * similar to Java model. NexmarkLauncher/Queries infrastructure expects the events to
+ * be able to quickly provide the estimates of their sizes.
+ *
+ * <p>The {@link BeamRecord} size is calculated at creation time.
+ *
+ * <p>Field sizes are sizes of Java types described in {@link BeamRecordSqlType}. Except strings,
+ * which are assumed to be taking 1-byte per character plus 1 byte size.
+ */
+public class BeamRecordSize implements KnownSize {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  public static final Coder<BeamRecordSize> CODER = new CustomCoder<BeamRecordSize>() {
+    @Override
+    public void encode(BeamRecordSize beamRecordSize, OutputStream outStream)
+        throws CoderException, IOException {
+
+      LONG_CODER.encode(beamRecordSize.sizeInBytes(), outStream);
+    }
+
+    @Override
+    public BeamRecordSize decode(InputStream inStream) throws CoderException, IOException {
+      return new BeamRecordSize(LONG_CODER.decode(inStream));
+    }
+  };
+
+  private static final Map<Integer, Integer> ESTIMATED_FIELD_SIZES =
+      ImmutableMap.<Integer, Integer>builder()
+          .put(Types.TINYINT, bytes(Byte.SIZE))
+          .put(Types.SMALLINT, bytes(Short.SIZE))
+          .put(Types.INTEGER, bytes(Integer.SIZE))
+          .put(Types.BIGINT, bytes(Long.SIZE))
+          .put(Types.FLOAT, bytes(Float.SIZE))
+          .put(Types.DOUBLE, bytes(Double.SIZE))
+          .put(Types.DECIMAL, 32)
+          .put(Types.BOOLEAN, 1)
+          .put(Types.TIME, bytes(Long.SIZE))
+          .put(Types.DATE, bytes(Long.SIZE))
+          .put(Types.TIMESTAMP, bytes(Long.SIZE))
+          .build();
+
+  public static ParDo.SingleOutput<BeamRecord, BeamRecordSize> parDo() {
+    return ParDo.of(new DoFn<BeamRecord, BeamRecordSize>() {
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        c.output(BeamRecordSize.of(c.element()));
+      }
+    });
+  }
+
+  public static BeamRecordSize of(BeamRecord beamRecord) {
+    return new BeamRecordSize(sizeInBytes(beamRecord));
+  }
+
+  private static long sizeInBytes(BeamRecord beamRecord) {
+    BeamRecordSqlType recordType = (BeamRecordSqlType) beamRecord.getDataType();
+    long size = 1; // nulls bitset
+
+    for (int fieldIndex = 0; fieldIndex < recordType.getFieldCount(); fieldIndex++) {
+      Integer fieldType = recordType.getFieldTypeByIndex(fieldIndex);
+
+      Integer estimatedSize = ESTIMATED_FIELD_SIZES.get(fieldType);
+
+      if (estimatedSize != null) {
+        size += estimatedSize;
+        continue;
+      }
+
+      if (isString(fieldType)) {
+        size += beamRecord.getString(fieldIndex).length() + 1;
+        continue;
+      }
+
+      throw new IllegalStateException("Unexpected field type " + fieldType);
+    }
+
+    return size;
+  }
+
+  private long sizeInBytes;
+
+  private BeamRecordSize(long sizeInBytes) {
+    this.sizeInBytes = sizeInBytes;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return sizeInBytes;
+  }
+
+  private static boolean isString(Integer fieldType) {
+    return fieldType == Types.CHAR || fieldType == Types.VARCHAR;
+  }
+
+  private static Integer bytes(int size) {
+    return size / Byte.SIZE;
+  }
+}
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/ToBeamRecord.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/ToBeamRecord.java
new file mode 100644
index 00000000000..942bb50d894
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/ToBeamRecord.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.model.sql;
+
+import java.util.Map;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.nexmark.model.sql.adapter.ModelAdaptersMapping;
+import org.apache.beam.sdk.nexmark.model.sql.adapter.ModelFieldsAdapter;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
+
+/**
+ * Convert Java model object to BeamRecord.
+ */
+public class ToBeamRecord {
+
+  static final ToBeamRecord INSTANCE = new ToBeamRecord(ModelAdaptersMapping.ADAPTERS);
+
+  private Map<Class, ModelFieldsAdapter> modelTypeAdapters;
+
+  private ToBeamRecord(Map<Class, ModelFieldsAdapter> modelTypeAdapters) {
+    this.modelTypeAdapters = modelTypeAdapters;
+  }
+
+  private BeamRecord toRecord(Event event) {
+    if (event == null) {
+      return null;
+    }
+
+    KnownSize model = getModel(event);
+    Class modelClass = model.getClass();
+
+    if (!modelTypeAdapters.containsKey(modelClass)) {
+      throw new IllegalArgumentException(
+          "Beam SQL record type adapter is not registered for " + model.getClass().getSimpleName());
+    }
+
+    ModelFieldsAdapter adapter = modelTypeAdapters.get(modelClass);
+
+    return new BeamRecord(adapter.getRecordType(), adapter.getFieldsValues(model));
+  }
+
+  private KnownSize getModel(Event event) {
+    if (event.newAuction != null) {
+      return event.newAuction;
+    } else if (event.newPerson != null) {
+      return event.newPerson;
+    } else if (event.bid != null) {
+      return event.bid;
+    }
+
+    throw new IllegalStateException("Unsupported event type " + event);
+  }
+
+  public static ParDo.SingleOutput<Event, BeamRecord> parDo() {
+    return ParDo.of(new DoFn<Event, BeamRecord>() {
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        BeamRecord beamRecord = INSTANCE.toRecord(c.element());
+        c.output(beamRecord);
+      }
+    });
+  }
+}
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/adapter/ModelAdaptersMapping.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/adapter/ModelAdaptersMapping.java
new file mode 100644
index 00000000000..8882708d13d
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/adapter/ModelAdaptersMapping.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.model.sql.adapter;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Person;
+
+/**
+ * Maps Java model classes to Beam SQL record types.
+ */
+public class ModelAdaptersMapping {
+
+  public static final Map<Class, ModelFieldsAdapter> ADAPTERS =
+      ImmutableMap.<Class, ModelFieldsAdapter>builder()
+          .put(Auction.class, auctionAdapter())
+          .put(Bid.class, bidAdapter())
+          .put(Person.class, personAdapter())
+          .build();
+
+  private static ModelFieldsAdapter<Person> personAdapter() {
+    return new ModelFieldsAdapter<Person>(
+        BeamRecordSqlType.builder()
+            .withBigIntField("id")
+            .withVarcharField("name")
+            .withVarcharField("emailAddress")
+            .withVarcharField("creditCard")
+            .withVarcharField("city")
+            .withVarcharField("state")
+            .withBigIntField("dateTime")
+            .withVarcharField("extra")
+            .build()) {
+      @Override
+      public List<Object> getFieldsValues(Person p) {
+        return Collections.<Object> unmodifiableList(Arrays.asList(
+            p.id,
+            p.name,
+            p.emailAddress,
+            p.creditCard,
+            p.city,
+            p.state,
+            p.dateTime,
+            p.extra
+        ));
+      }
+    };
+  }
+
+  private static ModelFieldsAdapter<Bid> bidAdapter() {
+    return new ModelFieldsAdapter<Bid>(
+        BeamRecordSqlType.builder()
+            .withBigIntField("auction")
+            .withBigIntField("bidder")
+            .withBigIntField("price")
+            .withBigIntField("dateTime")
+            .withVarcharField("extra")
+            .build()) {
+      @Override
+      public List<Object> getFieldsValues(Bid b) {
+        return Collections.<Object> unmodifiableList(Arrays.asList(
+            b.auction,
+            b.bidder,
+            b.price,
+            b.dateTime,
+            b.extra
+        ));
+      }
+    };
+  }
+
+  private static ModelFieldsAdapter<Auction> auctionAdapter() {
+    return new ModelFieldsAdapter<Auction>(
+        BeamRecordSqlType.builder()
+            .withBigIntField("id")
+            .withVarcharField("itemName")
+            .withVarcharField("description")
+            .withBigIntField("initialBid")
+            .withBigIntField("reserve")
+            .withBigIntField("dateTime")
+            .withBigIntField("expires")
+            .withBigIntField("seller")
+            .withBigIntField("category")
+            .withVarcharField("extra")
+            .build()) {
+      @Override
+      public List<Object> getFieldsValues(Auction a) {
+        return Collections.<Object>unmodifiableList(Arrays.asList(
+            a.id,
+            a.itemName,
+            a.description,
+            a.initialBid,
+            a.reserve,
+            a.dateTime,
+            a.expires,
+            a.seller,
+            a.category,
+            a.extra
+        ));
+      }
+    };
+  }
+}
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/adapter/ModelFieldsAdapter.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/adapter/ModelFieldsAdapter.java
new file mode 100644
index 00000000000..cf43cc32489
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/adapter/ModelFieldsAdapter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.model.sql.adapter;
+
+import java.util.List;
+
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.values.BeamRecordType;
+
+/**
+ * Helper class to help map Java model fields to Beam SQL Record Type fields.
+ */
+public abstract class ModelFieldsAdapter<T> {
+
+  private BeamRecordSqlType recordType;
+
+  ModelFieldsAdapter(BeamRecordSqlType recordType) {
+    this.recordType = recordType;
+  }
+
+  public BeamRecordType getRecordType() {
+    return recordType;
+  }
+
+  public abstract List<Object> getFieldsValues(T model);
+}
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/adapter/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/adapter/package-info.java
new file mode 100644
index 00000000000..b9554a8a92e
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/adapter/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * org.apache.beam.sdk.nexmark.model.sql.adapter.
+ */
+
+/**
+ * Model adapter which contains a mapping between specific Java model to a BeamRecord.
+ */
+package org.apache.beam.sdk.nexmark.model.sql.adapter;
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/package-info.java
new file mode 100644
index 00000000000..3b1b5c131c2
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * org.apache.beam.sdk.nexmark.model.sql.
+ */
+
+/**
+ * Java model conversion to Beam SQL model.
+ */
+package org.apache.beam.sdk.nexmark.model.sql;
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
index d070058dff9..2d7ca87044e 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
@@ -36,6 +36,7 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
+
 import org.joda.time.Instant;
 
 /**
@@ -83,7 +84,7 @@ public void processElement(ProcessContext c) {
   };
 
   /** Predicate to detect a new bid event. */
-  private static final SerializableFunction<Event, Boolean> IS_BID =
+  public static final SerializableFunction<Event, Boolean> IS_BID =
       new SerializableFunction<Event, Boolean>() {
         @Override
         public Boolean apply(Event event) {
@@ -211,7 +212,7 @@ public void processElement(ProcessContext c) {
   private final Monitor<Event> endOfStreamMonitor;
   private final Counter fatalCounter;
 
-  NexmarkQuery(NexmarkConfiguration configuration, String name) {
+  protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
     super(name);
     this.configuration = configuration;
     if (configuration.debug) {
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/NexmarkSqlQuery.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/NexmarkSqlQuery.java
new file mode 100644
index 00000000000..229ed425a69
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/NexmarkSqlQuery.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.queries.sql;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.nexmark.model.sql.BeamRecordSize;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Executor for Nexmark queries. Allows to decouple from NexmarkQuery
+ * and test independently.
+ */
+public class NexmarkSqlQuery extends NexmarkQuery {
+
+  private PTransform<PCollection<Event>, PCollection<BeamRecord>> queryTransform;
+
+  public NexmarkSqlQuery(NexmarkConfiguration configuration,
+                         PTransform<PCollection<Event>, PCollection<BeamRecord>> queryTransform) {
+    super(configuration, queryTransform.getName());
+    this.queryTransform = queryTransform;
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    PCollection<BeamRecord> queryResults = events.apply(queryTransform);
+
+    PCollection<? extends KnownSize> resultRecordsSizes = queryResults
+        .apply(BeamRecordSize.parDo())
+        .setCoder(BeamRecordSize.CODER);
+
+    return NexmarkUtils.castToKnownSize(name, resultRecordsSizes);
+  }
+}
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0.java
new file mode 100644
index 00000000000..17e5f0cefcf
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.queries.sql;
+
+import static org.apache.beam.sdk.nexmark.queries.NexmarkQuery.IS_BID;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.extensions.sql.BeamSql;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.sql.ToBeamRecord;
+import org.apache.beam.sdk.nexmark.model.sql.adapter.ModelAdaptersMapping;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query 0: Pass events through unchanged.
+ *
+ * <p>This measures the overhead of the Beam SQL implementation and test harness like
+ * conversion from Java model classes to Beam records.
+ *
+ * <p>{@link Bid} events are used here at the moment, ås they are most numerous
+ * with default configuration.
+ */
+public class SqlQuery0 extends PTransform<PCollection<Event>, PCollection<BeamRecord>> {
+
+  private static final BeamSql.SimpleQueryTransform QUERY =
+      BeamSql.query("SELECT * FROM PCOLLECTION");
+
+  public SqlQuery0() {
+    super("SqlQuery0");
+  }
+
+  @Override
+  public PCollection<BeamRecord> expand(PCollection<Event> allEvents) {
+
+    BeamRecordCoder bidRecordCoder = getBidRecordCoder();
+
+    PCollection<BeamRecord> bidEventsRecords = allEvents
+        .apply(Filter.by(IS_BID))
+        .apply(ToBeamRecord.parDo())
+        .apply(getName() + ".Serialize", logBytesMetric(bidRecordCoder))
+        .setCoder(bidRecordCoder);
+
+    return bidEventsRecords.apply(QUERY).setCoder(bidRecordCoder);
+  }
+
+  private PTransform<? super PCollection<BeamRecord>, PCollection<BeamRecord>> logBytesMetric(
+      final BeamRecordCoder coder) {
+
+    return ParDo.of(new DoFn<BeamRecord, BeamRecord>() {
+      private final Counter bytesMetric = Metrics.counter(name , "bytes");
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws CoderException, IOException {
+        ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        coder.encode(c.element(), outStream, Coder.Context.OUTER);
+        byte[] byteArray = outStream.toByteArray();
+        bytesMetric.inc((long) byteArray.length);
+        ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
+        BeamRecord record = coder.decode(inStream, Coder.Context.OUTER);
+        c.output(record);
+      }
+    });
+  }
+
+  private BeamRecordCoder getBidRecordCoder() {
+    return ModelAdaptersMapping.ADAPTERS.get(Bid.class).getRecordType().getRecordCoder();
+  }
+}
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/package-info.java
new file mode 100644
index 00000000000..f6a51fb7ee1
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * org.apache.beam.sdk.nexmark.queries.sql.
+ */
+
+/**
+ * Beam SQL Nexmark queries.
+ */
+package org.apache.beam.sdk.nexmark.queries.sql;
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/BeamRecordSizeTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/BeamRecordSizeTest.java
new file mode 100644
index 00000000000..7304dc9a07a
--- /dev/null
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/BeamRecordSizeTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.model.sql;
+
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link BeamRecordSize}.
+ */
+public class BeamRecordSizeTest {
+
+  private static final BeamRecordSqlType RECORD_TYPE = BeamRecordSqlType.builder()
+      .withTinyIntField("f_tinyint")
+      .withSmallIntField("f_smallint")
+      .withIntegerField("f_int")
+      .withBigIntField("f_bigint")
+      .withFloatField("f_float")
+      .withDoubleField("f_double")
+      .withDecimalField("f_decimal")
+      .withBooleanField("f_boolean")
+      .withTimeField("f_time")
+      .withDateField("f_date")
+      .withTimestampField("f_timestamp")
+      .withCharField("f_char")
+      .withVarcharField("f_varchar")
+      .build();
+
+  private static final List<Object> VALUES = ImmutableList.<Object>of(
+      (byte) 1,
+      (short) 2,
+      (int) 3,
+      (long) 4,
+      (float) 5.12,
+      (double) 6.32,
+      new BigDecimal(7),
+      false,
+      new GregorianCalendar(2019, 03, 02),
+      new Date(10L),
+      new Date(11L),
+      "12",
+      "13");
+
+  private static final long RECORD_SIZE = 91L;
+
+  private static final BeamRecord RECORD = new BeamRecord(RECORD_TYPE, VALUES);
+
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testCalculatesCorrectSize() throws Exception {
+    assertEquals(RECORD_SIZE, BeamRecordSize.of(RECORD).sizeInBytes());
+  }
+
+  @Test
+  public void testParDoConvertsToRecordSize() throws Exception {
+    PCollection<BeamRecord> records = testPipeline.apply(
+        TestStream.create(RECORD_TYPE.getRecordCoder())
+            .addElements(RECORD)
+            .advanceWatermarkToInfinity());
+
+    PAssert
+        .that(records)
+        .satisfies(new CorrectSize());
+
+    testPipeline.run();
+  }
+
+  static class CorrectSize implements SerializableFunction<Iterable<BeamRecord>, Void> {
+    @Override
+    public Void apply(Iterable<BeamRecord> input) {
+      BeamRecordSize recordSize = BeamRecordSize.of(Iterables.getOnlyElement(input));
+      assertThat(recordSize.sizeInBytes(), equalTo(RECORD_SIZE));
+      return null;
+    }
+  }
+}
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/ToBeamRecordTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/ToBeamRecordTest.java
new file mode 100644
index 00000000000..c13138d85cf
--- /dev/null
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/ToBeamRecordTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.model.sql;
+
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.nexmark.model.sql.adapter.ModelAdaptersMapping;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link ToBeamRecord}.
+ */
+public class ToBeamRecordTest {
+  private static final Person PERSON =
+      new Person(3L, "name", "email", "cc", "city", "state", 329823L, "extra");
+
+  private static final Bid BID =
+      new Bid(5L, 3L, 123123L, 43234234L, "extra2");
+
+  private static final Auction AUCTION =
+      new Auction(5L, "item", "desc", 342L, 321L, 3423342L, 2349234L, 3L, 1L, "extra3");
+
+  @Rule
+  public TestPipeline testPipeline = TestPipeline.create();
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testConvertsBids() throws Exception {
+    PCollection<Event> bids = testPipeline.apply(
+        TestStream.create(Event.CODER)
+            .addElements(new Event(BID))
+            .advanceWatermarkToInfinity());
+
+    BeamRecord expectedBidRecord =
+        new BeamRecord(
+            ModelAdaptersMapping.ADAPTERS.get(Bid.class).getRecordType(),
+            ModelAdaptersMapping.ADAPTERS.get(Bid.class).getFieldsValues(BID));
+
+    PAssert
+        .that(bids.apply(ToBeamRecord.parDo()))
+        .containsInAnyOrder(expectedBidRecord);
+
+    testPipeline.run();
+  }
+
+  @Test
+  public void testConvertsPeople() throws Exception {
+    PCollection<Event> people = testPipeline.apply(
+        TestStream.create(Event.CODER)
+            .addElements(new Event(PERSON))
+            .advanceWatermarkToInfinity());
+
+    BeamRecord expectedPersonRecord =
+        new BeamRecord(
+            ModelAdaptersMapping.ADAPTERS.get(Person.class).getRecordType(),
+            ModelAdaptersMapping.ADAPTERS.get(Person.class).getFieldsValues(PERSON));
+
+    PAssert
+        .that(people.apply(ToBeamRecord.parDo()))
+        .containsInAnyOrder(expectedPersonRecord);
+
+    testPipeline.run();
+  }
+
+  @Test
+  public void testConvertsAuctions() throws Exception {
+    PCollection<Event> auctions = testPipeline.apply(
+        TestStream.create(Event.CODER)
+            .addElements(new Event(AUCTION))
+            .advanceWatermarkToInfinity());
+
+    BeamRecord expectedAuctionRecord =
+        new BeamRecord(
+            ModelAdaptersMapping.ADAPTERS.get(Auction.class).getRecordType(),
+            ModelAdaptersMapping.ADAPTERS.get(Auction.class).getFieldsValues(AUCTION));
+
+    PAssert
+        .that(auctions.apply(ToBeamRecord.parDo()))
+        .containsInAnyOrder(expectedAuctionRecord);
+
+    testPipeline.run();
+  }
+}
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/adapter/ModelAdaptersMappingTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/adapter/ModelAdaptersMappingTest.java
new file mode 100644
index 00000000000..1eccfffc804
--- /dev/null
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/adapter/ModelAdaptersMappingTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.model.sql.adapter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link ModelAdaptersMapping}.
+ */
+public class ModelAdaptersMappingTest {
+
+  private static final Person PERSON =
+      new Person(3L, "name", "email", "cc", "city", "state", 329823L, "extra");
+
+  private static final BeamRecordSqlType PERSON_RECORD_TYPE = BeamRecordSqlType.builder()
+      .withBigIntField("id")
+      .withVarcharField("name")
+      .withVarcharField("emailAddress")
+      .withVarcharField("creditCard")
+      .withVarcharField("city")
+      .withVarcharField("state")
+      .withBigIntField("dateTime")
+      .withVarcharField("extra")
+      .build();
+
+  private static final Bid BID =
+      new Bid(5L, 3L, 123123L, 43234234L, "extra2");
+
+  private static final BeamRecordSqlType BID_RECORD_TYPE = BeamRecordSqlType.builder()
+      .withBigIntField("auction")
+      .withBigIntField("bidder")
+      .withBigIntField("price")
+      .withBigIntField("dateTime")
+      .withVarcharField("extra")
+      .build();
+
+  private static final Auction AUCTION =
+      new Auction(5L, "item", "desc", 342L, 321L, 3423342L, 2349234L, 3L, 1L, "extra3");
+
+  private static final BeamRecordSqlType AUCTION_RECORD_TYPE = BeamRecordSqlType.builder()
+      .withBigIntField("id")
+      .withVarcharField("itemName")
+      .withVarcharField("description")
+      .withBigIntField("initialBid")
+      .withBigIntField("reserve")
+      .withBigIntField("dateTime")
+      .withBigIntField("expires")
+      .withBigIntField("seller")
+      .withBigIntField("category")
+      .withVarcharField("extra")
+      .build();
+
+  @Test
+  public void hasAdaptersForSupportedModels() throws Exception {
+    assertTrue(ModelAdaptersMapping.ADAPTERS.containsKey(Bid.class));
+    assertTrue(ModelAdaptersMapping.ADAPTERS.containsKey(Person.class));
+    assertTrue(ModelAdaptersMapping.ADAPTERS.containsKey(Auction.class));
+
+    assertNotNull(ModelAdaptersMapping.ADAPTERS.get(Bid.class));
+    assertNotNull(ModelAdaptersMapping.ADAPTERS.get(Person.class));
+    assertNotNull(ModelAdaptersMapping.ADAPTERS.get(Auction.class));
+  }
+
+  @Test
+  public void testBidAdapterRecordType() {
+    ModelFieldsAdapter<Person> adapter = ModelAdaptersMapping.ADAPTERS.get(Bid.class);
+
+    BeamRecordSqlType bidRecordType = (BeamRecordSqlType) adapter.getRecordType();
+
+    assertEquals(BID_RECORD_TYPE.getFieldNames(), bidRecordType.getFieldNames());
+    assertEquals(BID_RECORD_TYPE.getFieldTypes(), bidRecordType.getFieldTypes());
+  }
+
+  @Test
+  public void testPersonAdapterRecordType() {
+    ModelFieldsAdapter<Person> adapter = ModelAdaptersMapping.ADAPTERS.get(Person.class);
+
+    BeamRecordSqlType personRecordType = (BeamRecordSqlType) adapter.getRecordType();
+
+    assertEquals(PERSON_RECORD_TYPE.getFieldNames(), personRecordType.getFieldNames());
+    assertEquals(PERSON_RECORD_TYPE.getFieldTypes(), personRecordType.getFieldTypes());
+  }
+
+  @Test
+  public void testAuctionAdapterRecordType() {
+    ModelFieldsAdapter<Person> adapter = ModelAdaptersMapping.ADAPTERS.get(Auction.class);
+
+    BeamRecordSqlType auctionRecordType = (BeamRecordSqlType) adapter.getRecordType();
+
+    assertEquals(AUCTION_RECORD_TYPE.getFieldNames(), auctionRecordType.getFieldNames());
+    assertEquals(AUCTION_RECORD_TYPE.getFieldTypes(), auctionRecordType.getFieldTypes());
+  }
+
+  @Test
+  public void testPersonAdapterGetsFieldValues() throws Exception {
+    ModelFieldsAdapter<Person> adapter = ModelAdaptersMapping.ADAPTERS.get(Person.class);
+    List<Object> values = adapter.getFieldsValues(PERSON);
+    assertEquals(PERSON.id, values.get(0));
+    assertEquals(PERSON.name, values.get(1));
+    assertEquals(PERSON.emailAddress, values.get(2));
+    assertEquals(PERSON.creditCard, values.get(3));
+    assertEquals(PERSON.city, values.get(4));
+    assertEquals(PERSON.state, values.get(5));
+    assertEquals(PERSON.dateTime, values.get(6));
+    assertEquals(PERSON.extra, values.get(7));
+  }
+
+  @Test
+  public void testBidAdapterGetsFieldValues() throws Exception {
+    ModelFieldsAdapter<Bid> adapter = ModelAdaptersMapping.ADAPTERS.get(Bid.class);
+    List<Object> values = adapter.getFieldsValues(BID);
+    assertEquals(BID.auction, values.get(0));
+    assertEquals(BID.bidder, values.get(1));
+    assertEquals(BID.price, values.get(2));
+    assertEquals(BID.dateTime, values.get(3));
+    assertEquals(BID.extra, values.get(4));
+  }
+
+  @Test
+  public void testAuctionAdapterGetsFieldValues() throws Exception {
+    ModelFieldsAdapter<Auction> adapter = ModelAdaptersMapping.ADAPTERS.get(Auction.class);
+    List<Object> values = adapter.getFieldsValues(AUCTION);
+    assertEquals(AUCTION.id, values.get(0));
+    assertEquals(AUCTION.itemName, values.get(1));
+    assertEquals(AUCTION.description, values.get(2));
+    assertEquals(AUCTION.initialBid, values.get(3));
+    assertEquals(AUCTION.reserve, values.get(4));
+    assertEquals(AUCTION.dateTime, values.get(5));
+    assertEquals(AUCTION.expires, values.get(6));
+    assertEquals(AUCTION.seller, values.get(7));
+    assertEquals(AUCTION.category, values.get(8));
+    assertEquals(AUCTION.extra, values.get(9));
+  }
+}
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0Test.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0Test.java
new file mode 100644
index 00000000000..d5889d8903c
--- /dev/null
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0Test.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.queries.sql;
+
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.sql.adapter.ModelAdaptersMapping;
+import org.apache.beam.sdk.nexmark.model.sql.adapter.ModelFieldsAdapter;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link SqlQuery0}.
+ */
+public class SqlQuery0Test {
+
+  private static final Bid BID1 =
+      new Bid(5L, 3L, 123123L, 43234234L, "extra1");
+
+  private static final Bid BID2 =
+      new Bid(6L, 4L, 134123L, 13234234L, "extra2");
+
+  private static final ModelFieldsAdapter<Bid> BID_ADAPTER =
+      ModelAdaptersMapping.ADAPTERS.get(Bid.class);
+
+  private static final BeamRecord BID1_RECORD =
+      new BeamRecord(BID_ADAPTER.getRecordType(), BID_ADAPTER.getFieldsValues(BID1));
+
+  private static final BeamRecord BID2_RECORD =
+      new BeamRecord(BID_ADAPTER.getRecordType(), BID_ADAPTER.getFieldsValues(BID2));
+
+  @Rule
+  public TestPipeline testPipeline = TestPipeline.create();
+
+  @Test
+  public void testPassesBidsThrough() throws Exception {
+    PCollection<Event> bids = testPipeline.apply(
+        TestStream.create(Event.CODER)
+            .addElements(new Event(BID1))
+            .addElements(new Event(BID2))
+            .advanceWatermarkToInfinity());
+
+    PAssert
+        .that(bids.apply(new SqlQuery0()))
+        .containsInAnyOrder(BID1_RECORD, BID2_RECORD);
+
+    testPipeline.run();
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> [Nexmark][SQL] Implement a basic pass-through SQL query
> -------------------------------------------------------
>
>                 Key: BEAM-3181
>                 URL: https://issues.apache.org/jira/browse/BEAM-3181
>             Project: Beam
>          Issue Type: Sub-task
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Anton Kedin
>
> Implement a basic query to make sure some scaffolding is in place, like conversion between Java models to BeamRecords and then to KnownSize.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message