Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8E22D160C3A for ; Wed, 3 Jan 2018 19:07:07 +0100 (CET) Received: (qmail 25157 invoked by uid 500); 3 Jan 2018 18:07:06 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 25148 invoked by uid 99); 3 Jan 2018 18:07:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jan 2018 18:07:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 37129C19C5 for ; Wed, 3 Jan 2018 18:07:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.211 X-Spam-Level: X-Spam-Status: No, score=-99.211 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id UbFq_a3Kxfec for ; Wed, 3 Jan 2018 18:07:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id A66425FB9F for ; Wed, 3 Jan 2018 18:07:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 09222E095C for ; Wed, 3 Jan 2018 18:07:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id B1BE4240EE for ; Wed, 3 Jan 2018 18:07:00 +0000 (UTC) Date: Wed, 3 Jan 2018 18:07:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: commits@beam.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (BEAM-3181) [Nexmark][SQL] Implement a basic pass-through SQL query MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 03 Jan 2018 18:07:09 -0000 [ https://issues.apache.org/jira/browse/BEAM-3181?page=3Dcom.atlassian.= jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D16310= 008#comment-16310008 ]=20 ASF GitHub Bot commented on BEAM-3181: -------------------------------------- kennknowles closed pull request #4128: [BEAM-3181][Nexmark][SQL] Implement = query0 URL: https://github.com/apache/beam/pull/4128 =20 =20 =20 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/nexmark/build.gradle b/sdks/java/nexmark/build.gradl= e index 327dbecb1d5..6b76fa91823 100644 --- a/sdks/java/nexmark/build.gradle +++ b/sdks/java/nexmark/build.gradle @@ -26,6 +26,7 @@ dependencies { shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-= java-core", configuration: "shadow") shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-= java-io-parent:beam-sdks-java-io-google-cloud-platform", configuration: "sh= adow") shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-= java-extensions-parent:beam-sdks-java-extensions-google-cloud-platform-core= ", configuration: "shadow") + shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-= java-extensions-parent:beam-sdks-java-extensions-sql", configuration: "shad= ow") shadow library.java.google_api_services_bigquery shadow library.java.jackson_core shadow library.java.jackson_annotations @@ -36,6 +37,7 @@ dependencies { shadow library.java.findbugs_jsr305 shadow library.java.junit shadow library.java.hamcrest_core + shadow library.java.commons_lang3 shadow project(path: ":beam-runners-parent:beam-runners-direct-java", co= nfiguration: "shadow") shadow library.java.slf4j_jdk14 testCompile library.java.hamcrest_core @@ -46,4 +48,8 @@ task packageTests(type: Jar) { classifier =3D "tests" } =20 +test { + jvmArgs "-da" +} + artifacts.archives packageTests diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml index 15f51049ec9..b10fa69c3ab 100644 --- a/sdks/java/nexmark/pom.xml +++ b/sdks/java/nexmark/pom.xml @@ -163,6 +163,14 @@ + + + org.apache.maven.plugins + maven-surefire-plugin + + -da + + =20 @@ -170,6 +178,7 @@ org.apache.beam beam-sdks-java-core + compile =20 @@ -261,5 +270,19 @@ auto-value provided + + + org.apache.beam + beam-sdks-java-extensions-sql + compile + ${parent.version} + + + + org.apache.commons + commons-lang3 + ${apache.commons.lang.version} + runtime + diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Ne= xmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/ne= xmark/NexmarkLauncher.java index 550fbd2ce84..a4b5c5aaf5c 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLa= uncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLa= uncher.java @@ -25,6 +25,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -33,7 +34,9 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.ThreadLocalRandom; + import javax.annotation.Nullable; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.CoderException; @@ -77,6 +80,8 @@ import org.apache.beam.sdk.nexmark.queries.Query8Model; import org.apache.beam.sdk.nexmark.queries.Query9; import org.apache.beam.sdk.nexmark.queries.Query9Model; +import org.apache.beam.sdk.nexmark.queries.sql.NexmarkSqlQuery; +import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery0; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -86,6 +91,7 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; + import org.joda.time.Duration; import org.slf4j.LoggerFactory; =20 @@ -93,7 +99,14 @@ * Run a single Nexmark query using a given configuration. */ public class NexmarkLauncher { + private static final org.slf4j.Logger LOG =3D LoggerFactory.getLogger(Ne= xmarkLauncher.class); + + /** + * Command line parameter value for query language. + */ + private static final String SQL =3D "sql"; + /** * Minimum number of samples needed for 'stead-state' rate calculation. */ @@ -1056,37 +1069,10 @@ public NexmarkPerf run(NexmarkConfiguration runConf= iguration) { return null; } =20 - List queries =3D Arrays.asList(new Query0(configuratio= n), - new Query1(configuration)= , - new Query2(configuration)= , - new Query3(configuration)= , - new Query4(configuration)= , - new Query5(configuration)= , - new Query6(configuration)= , - new Query7(configuration)= , - new Query8(configuration)= , - new Query9(configuration)= , - new Query10(configuration= ), - new Query11(configuration= ), - new Query12(configuration= )); - NexmarkQuery query =3D queries.get(configuration.query); + NexmarkQuery query =3D getNexmarkQuery(); queryName =3D query.getName(); =20 - List models =3D Arrays.asList( - new Query0Model(configuration), - new Query1Model(configuration), - new Query2Model(configuration), - new Query3Model(configuration), - new Query4Model(configuration), - new Query5Model(configuration), - new Query6Model(configuration), - new Query7Model(configuration), - new Query8Model(configuration), - new Query9Model(configuration), - null, - null, - null); - NexmarkQueryModel model =3D models.get(configuration.query); + NexmarkQueryModel model =3D getNexmarkQueryModel(); =20 if (options.getJustModelResultRate()) { if (model =3D=3D null) { @@ -1154,4 +1140,81 @@ public NexmarkPerf run(NexmarkConfiguration runConfi= guration) { queryName =3D null; } } + + private boolean isSql() { + return SQL.equalsIgnoreCase(options.getQueryLanguage()); + } + + private NexmarkQueryModel getNexmarkQueryModel() { + List models =3D createQueryModels(); + return models.get(configuration.query); + } + + private NexmarkQuery getNexmarkQuery() { + List queries =3D createQueries(); + + if (options.getQuery() >=3D queries.size()) { + throw new UnsupportedOperationException( + "Query " + options.getQuery() + + " is not implemented yet"); + } + + return queries.get(configuration.query); + } + + private List createQueryModels() { + return isSql() + ? createSqlQueryModels() + : createJavaQueryModels(); + } + + private List createSqlQueryModels() { + return Arrays.asList( + null, null, null, null, null, null, null, null, null, null, null, = null); + } + + private List createJavaQueryModels() { + return Arrays.asList( + new Query0Model(configuration), + new Query1Model(configuration), + new Query2Model(configuration), + new Query3Model(configuration), + new Query4Model(configuration), + new Query5Model(configuration), + new Query6Model(configuration), + new Query7Model(configuration), + new Query8Model(configuration), + new Query9Model(configuration), + null, + null, + null); + } + + private List createQueries() { + return isSql() + ? createSqlQueries() + : createJavaQueries(); + } + + private List createSqlQueries() { + return Arrays. asList( + new NexmarkSqlQuery(configuration, new SqlQuery0())); + } + + private List createJavaQueries() { + return Arrays.asList( + new Query0(configuration), + new Query1(configuration), + new Query2(configuration), + new Query3(configuration), + new Query4(configuration), + new Query5(configuration), + new Query6(configuration), + new Query7(configuration), + new Query8(configuration), + new Query9(configuration), + new Query10(configuration), + new Query11(configuration), + new Query12(configuration)); + } } diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Ne= xmarkOptions.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nex= mark/NexmarkOptions.java index 2a2a5a782a6..b9c8861b680 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOp= tions.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOp= tions.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.nexmark; =20 import javax.annotation.Nullable; + import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; @@ -400,4 +401,10 @@ Long getWatermarkValidationDelaySeconds(); =20 void setWatermarkValidationDelaySeconds(Long value); + + @Description("Specify 'sql' to use Beam SQL queries. Otherwise Java tran= sforms will be used") + @Nullable + String getQueryLanguage(); + + void setQueryLanguage(String value); } diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/mo= del/Auction.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexm= ark/model/Auction.java index 6a37ade0162..32d7e22ff6c 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auc= tion.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auc= tion.java @@ -80,14 +80,14 @@ public Auction decode( =20 /** Extra auction properties. */ @JsonProperty - private final String itemName; + public final String itemName; =20 @JsonProperty - private final String description; + public final String description; =20 /** Initial bid price, in cents. */ @JsonProperty - private final long initialBid; + public final long initialBid; =20 /** Reserve price, in cents. */ @JsonProperty @@ -110,7 +110,7 @@ public Auction decode( =20 /** Additional arbitrary payload for performance testing. */ @JsonProperty - private final String extra; + public final String extra; =20 =20 // For Avro only. diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/mo= del/Person.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexma= rk/model/Person.java index 800f937eade..a0c78eb0af5 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Per= son.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Per= son.java @@ -75,10 +75,10 @@ public Person decode(InputStream inStream) public final String name; =20 @JsonProperty - private final String emailAddress; + public final String emailAddress; =20 @JsonProperty - private final String creditCard; + public final String creditCard; =20 @JsonProperty public final String city; @@ -91,7 +91,7 @@ public Person decode(InputStream inStream) =20 /** Additional arbitrary payload for performance testing. */ @JsonProperty - private final String extra; + public final String extra; =20 // For Avro only. @SuppressWarnings("unused") diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/mo= del/sql/BeamRecordSize.java b/sdks/java/nexmark/src/main/java/org/apache/be= am/sdk/nexmark/model/sql/BeamRecordSize.java new file mode 100644 index 00000000000..e0a5f3c4c96 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql= /BeamRecordSize.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.model.sql; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Types; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; + +/** + * {@link KnownSize} implementation to estimate the size of a {@link BeamR= ecord}, + * similar to Java model. NexmarkLauncher/Queries infrastructure expects t= he events to + * be able to quickly provide the estimates of their sizes. + * + *

The {@link BeamRecord} size is calculated at creation time. + * + *

Field sizes are sizes of Java types described in {@link BeamRecordSq= lType}. Except strings, + * which are assumed to be taking 1-byte per character plus 1 byte size. + */ +public class BeamRecordSize implements KnownSize { + private static final Coder LONG_CODER =3D VarLongCoder.of(); + public static final Coder CODER =3D new CustomCoder() { + @Override + public void encode(BeamRecordSize beamRecordSize, OutputStream outStre= am) + throws CoderException, IOException { + + LONG_CODER.encode(beamRecordSize.sizeInBytes(), outStream); + } + + @Override + public BeamRecordSize decode(InputStream inStream) throws CoderExcepti= on, IOException { + return new BeamRecordSize(LONG_CODER.decode(inStream)); + } + }; + + private static final Map ESTIMATED_FIELD_SIZES =3D + ImmutableMap.builder() + .put(Types.TINYINT, bytes(Byte.SIZE)) + .put(Types.SMALLINT, bytes(Short.SIZE)) + .put(Types.INTEGER, bytes(Integer.SIZE)) + .put(Types.BIGINT, bytes(Long.SIZE)) + .put(Types.FLOAT, bytes(Float.SIZE)) + .put(Types.DOUBLE, bytes(Double.SIZE)) + .put(Types.DECIMAL, 32) + .put(Types.BOOLEAN, 1) + .put(Types.TIME, bytes(Long.SIZE)) + .put(Types.DATE, bytes(Long.SIZE)) + .put(Types.TIMESTAMP, bytes(Long.SIZE)) + .build(); + + public static ParDo.SingleOutput parDo() { + return ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(BeamRecordSize.of(c.element())); + } + }); + } + + public static BeamRecordSize of(BeamRecord beamRecord) { + return new BeamRecordSize(sizeInBytes(beamRecord)); + } + + private static long sizeInBytes(BeamRecord beamRecord) { + BeamRecordSqlType recordType =3D (BeamRecordSqlType) beamRecord.getDat= aType(); + long size =3D 1; // nulls bitset + + for (int fieldIndex =3D 0; fieldIndex < recordType.getFieldCount(); fi= eldIndex++) { + Integer fieldType =3D recordType.getFieldTypeByIndex(fieldIndex); + + Integer estimatedSize =3D ESTIMATED_FIELD_SIZES.get(fieldType); + + if (estimatedSize !=3D null) { + size +=3D estimatedSize; + continue; + } + + if (isString(fieldType)) { + size +=3D beamRecord.getString(fieldIndex).length() + 1; + continue; + } + + throw new IllegalStateException("Unexpected field type " + fieldType= ); + } + + return size; + } + + private long sizeInBytes; + + private BeamRecordSize(long sizeInBytes) { + this.sizeInBytes =3D sizeInBytes; + } + + @Override + public long sizeInBytes() { + return sizeInBytes; + } + + private static boolean isString(Integer fieldType) { + return fieldType =3D=3D Types.CHAR || fieldType =3D=3D Types.VARCHAR; + } + + private static Integer bytes(int size) { + return size / Byte.SIZE; + } +} diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/mo= del/sql/ToBeamRecord.java b/sdks/java/nexmark/src/main/java/org/apache/beam= /sdk/nexmark/model/sql/ToBeamRecord.java new file mode 100644 index 00000000000..942bb50d894 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql= /ToBeamRecord.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.model.sql; + +import java.util.Map; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.nexmark.model.sql.adapter.ModelAdaptersMapping; +import org.apache.beam.sdk.nexmark.model.sql.adapter.ModelFieldsAdapter; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; + +/** + * Convert Java model object to BeamRecord. + */ +public class ToBeamRecord { + + static final ToBeamRecord INSTANCE =3D new ToBeamRecord(ModelAdaptersMap= ping.ADAPTERS); + + private Map modelTypeAdapters; + + private ToBeamRecord(Map modelTypeAdapters) { + this.modelTypeAdapters =3D modelTypeAdapters; + } + + private BeamRecord toRecord(Event event) { + if (event =3D=3D null) { + return null; + } + + KnownSize model =3D getModel(event); + Class modelClass =3D model.getClass(); + + if (!modelTypeAdapters.containsKey(modelClass)) { + throw new IllegalArgumentException( + "Beam SQL record type adapter is not registered for " + model.ge= tClass().getSimpleName()); + } + + ModelFieldsAdapter adapter =3D modelTypeAdapters.get(modelClass); + + return new BeamRecord(adapter.getRecordType(), adapter.getFieldsValues= (model)); + } + + private KnownSize getModel(Event event) { + if (event.newAuction !=3D null) { + return event.newAuction; + } else if (event.newPerson !=3D null) { + return event.newPerson; + } else if (event.bid !=3D null) { + return event.bid; + } + + throw new IllegalStateException("Unsupported event type " + event); + } + + public static ParDo.SingleOutput parDo() { + return ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + BeamRecord beamRecord =3D INSTANCE.toRecord(c.element()); + c.output(beamRecord); + } + }); + } +} diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/mo= del/sql/adapter/ModelAdaptersMapping.java b/sdks/java/nexmark/src/main/java= /org/apache/beam/sdk/nexmark/model/sql/adapter/ModelAdaptersMapping.java new file mode 100644 index 00000000000..8882708d13d --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql= /adapter/ModelAdaptersMapping.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.model.sql.adapter; + +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Person; + +/** + * Maps Java model classes to Beam SQL record types. + */ +public class ModelAdaptersMapping { + + public static final Map ADAPTERS =3D + ImmutableMap.builder() + .put(Auction.class, auctionAdapter()) + .put(Bid.class, bidAdapter()) + .put(Person.class, personAdapter()) + .build(); + + private static ModelFieldsAdapter personAdapter() { + return new ModelFieldsAdapter( + BeamRecordSqlType.builder() + .withBigIntField("id") + .withVarcharField("name") + .withVarcharField("emailAddress") + .withVarcharField("creditCard") + .withVarcharField("city") + .withVarcharField("state") + .withBigIntField("dateTime") + .withVarcharField("extra") + .build()) { + @Override + public List getFieldsValues(Person p) { + return Collections. unmodifiableList(Arrays.asList( + p.id, + p.name, + p.emailAddress, + p.creditCard, + p.city, + p.state, + p.dateTime, + p.extra + )); + } + }; + } + + private static ModelFieldsAdapter bidAdapter() { + return new ModelFieldsAdapter( + BeamRecordSqlType.builder() + .withBigIntField("auction") + .withBigIntField("bidder") + .withBigIntField("price") + .withBigIntField("dateTime") + .withVarcharField("extra") + .build()) { + @Override + public List getFieldsValues(Bid b) { + return Collections. unmodifiableList(Arrays.asList( + b.auction, + b.bidder, + b.price, + b.dateTime, + b.extra + )); + } + }; + } + + private static ModelFieldsAdapter auctionAdapter() { + return new ModelFieldsAdapter( + BeamRecordSqlType.builder() + .withBigIntField("id") + .withVarcharField("itemName") + .withVarcharField("description") + .withBigIntField("initialBid") + .withBigIntField("reserve") + .withBigIntField("dateTime") + .withBigIntField("expires") + .withBigIntField("seller") + .withBigIntField("category") + .withVarcharField("extra") + .build()) { + @Override + public List getFieldsValues(Auction a) { + return Collections.unmodifiableList(Arrays.asList( + a.id, + a.itemName, + a.description, + a.initialBid, + a.reserve, + a.dateTime, + a.expires, + a.seller, + a.category, + a.extra + )); + } + }; + } +} diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/mo= del/sql/adapter/ModelFieldsAdapter.java b/sdks/java/nexmark/src/main/java/o= rg/apache/beam/sdk/nexmark/model/sql/adapter/ModelFieldsAdapter.java new file mode 100644 index 00000000000..cf43cc32489 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql= /adapter/ModelFieldsAdapter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.model.sql.adapter; + +import java.util.List; + +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.values.BeamRecordType; + +/** + * Helper class to help map Java model fields to Beam SQL Record Type fiel= ds. + */ +public abstract class ModelFieldsAdapter { + + private BeamRecordSqlType recordType; + + ModelFieldsAdapter(BeamRecordSqlType recordType) { + this.recordType =3D recordType; + } + + public BeamRecordType getRecordType() { + return recordType; + } + + public abstract List getFieldsValues(T model); +} diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/mo= del/sql/adapter/package-info.java b/sdks/java/nexmark/src/main/java/org/apa= che/beam/sdk/nexmark/model/sql/adapter/package-info.java new file mode 100644 index 00000000000..b9554a8a92e --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql= /adapter/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * org.apache.beam.sdk.nexmark.model.sql.adapter. + */ + +/** + * Model adapter which contains a mapping between specific Java model to a= BeamRecord. + */ +package org.apache.beam.sdk.nexmark.model.sql.adapter; diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/mo= del/sql/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam= /sdk/nexmark/model/sql/package-info.java new file mode 100644 index 00000000000..3b1b5c131c2 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql= /package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * org.apache.beam.sdk.nexmark.model.sql. + */ + +/** + * Java model conversion to Beam SQL model. + */ +package org.apache.beam.sdk.nexmark.model.sql; diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/qu= eries/NexmarkQuery.java b/sdks/java/nexmark/src/main/java/org/apache/beam/s= dk/nexmark/queries/NexmarkQuery.java index d070058dff9..2d7ca87044e 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/N= exmarkQuery.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/N= exmarkQuery.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; + import org.joda.time.Instant; =20 /** @@ -83,7 +84,7 @@ public void processElement(ProcessContext c) { }; =20 /** Predicate to detect a new bid event. */ - private static final SerializableFunction IS_BID =3D + public static final SerializableFunction IS_BID =3D new SerializableFunction() { @Override public Boolean apply(Event event) { @@ -211,7 +212,7 @@ public void processElement(ProcessContext c) { private final Monitor endOfStreamMonitor; private final Counter fatalCounter; =20 - NexmarkQuery(NexmarkConfiguration configuration, String name) { + protected NexmarkQuery(NexmarkConfiguration configuration, String name) = { super(name); this.configuration =3D configuration; if (configuration.debug) { diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/qu= eries/sql/NexmarkSqlQuery.java b/sdks/java/nexmark/src/main/java/org/apache= /beam/sdk/nexmark/queries/sql/NexmarkSqlQuery.java new file mode 100644 index 00000000000..229ed425a69 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/s= ql/NexmarkSqlQuery.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.queries.sql; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.nexmark.model.sql.BeamRecordSize; +import org.apache.beam.sdk.nexmark.queries.NexmarkQuery; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; + +/** + * Executor for Nexmark queries. Allows to decouple from NexmarkQuery + * and test independently. + */ +public class NexmarkSqlQuery extends NexmarkQuery { + + private PTransform, PCollection> queryTra= nsform; + + public NexmarkSqlQuery(NexmarkConfiguration configuration, + PTransform, PCollection> queryTransform) { + super(configuration, queryTransform.getName()); + this.queryTransform =3D queryTransform; + } + + @Override + protected PCollection applyPrim(PCollection events) { + PCollection queryResults =3D events.apply(queryTransform); + + PCollection resultRecordsSizes =3D queryResults + .apply(BeamRecordSize.parDo()) + .setCoder(BeamRecordSize.CODER); + + return NexmarkUtils.castToKnownSize(name, resultRecordsSizes); + } +} diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/qu= eries/sql/SqlQuery0.java b/sdks/java/nexmark/src/main/java/org/apache/beam/= sdk/nexmark/queries/sql/SqlQuery0.java new file mode 100644 index 00000000000..17e5f0cefcf --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/s= ql/SqlQuery0.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.queries.sql; + +import static org.apache.beam.sdk.nexmark.queries.NexmarkQuery.IS_BID; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.beam.sdk.coders.BeamRecordCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.extensions.sql.BeamSql; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.sql.ToBeamRecord; +import org.apache.beam.sdk.nexmark.model.sql.adapter.ModelAdaptersMapping; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; + +/** + * Query 0: Pass events through unchanged. + * + *

This measures the overhead of the Beam SQL implementation and test h= arness like + * conversion from Java model classes to Beam records. + * + *

{@link Bid} events are used here at the moment, =C3=A5s they are mos= t numerous + * with default configuration. + */ +public class SqlQuery0 extends PTransform, PCollection<= BeamRecord>> { + + private static final BeamSql.SimpleQueryTransform QUERY =3D + BeamSql.query("SELECT * FROM PCOLLECTION"); + + public SqlQuery0() { + super("SqlQuery0"); + } + + @Override + public PCollection expand(PCollection allEvents) { + + BeamRecordCoder bidRecordCoder =3D getBidRecordCoder(); + + PCollection bidEventsRecords =3D allEvents + .apply(Filter.by(IS_BID)) + .apply(ToBeamRecord.parDo()) + .apply(getName() + ".Serialize", logBytesMetric(bidRecordCoder)) + .setCoder(bidRecordCoder); + + return bidEventsRecords.apply(QUERY).setCoder(bidRecordCoder); + } + + private PTransform, PCollection> logBytesMetric( + final BeamRecordCoder coder) { + + return ParDo.of(new DoFn() { + private final Counter bytesMetric =3D Metrics.counter(name , "bytes"= ); + + @ProcessElement + public void processElement(ProcessContext c) throws CoderException, = IOException { + ByteArrayOutputStream outStream =3D new ByteArrayOutputStream(); + coder.encode(c.element(), outStream, Coder.Context.OUTER); + byte[] byteArray =3D outStream.toByteArray(); + bytesMetric.inc((long) byteArray.length); + ByteArrayInputStream inStream =3D new ByteArrayInputStream(byteArr= ay); + BeamRecord record =3D coder.decode(inStream, Coder.Context.OUTER); + c.output(record); + } + }); + } + + private BeamRecordCoder getBidRecordCoder() { + return ModelAdaptersMapping.ADAPTERS.get(Bid.class).getRecordType().ge= tRecordCoder(); + } +} diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/qu= eries/sql/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/be= am/sdk/nexmark/queries/sql/package-info.java new file mode 100644 index 00000000000..f6a51fb7ee1 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/s= ql/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * org.apache.beam.sdk.nexmark.queries.sql. + */ + +/** + * Beam SQL Nexmark queries. + */ +package org.apache.beam.sdk.nexmark.queries.sql; diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/mo= del/sql/BeamRecordSizeTest.java b/sdks/java/nexmark/src/test/java/org/apach= e/beam/sdk/nexmark/model/sql/BeamRecordSizeTest.java new file mode 100644 index 00000000000..7304dc9a07a --- /dev/null +++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql= /BeamRecordSizeTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.model.sql; + +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import java.math.BigDecimal; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Unit tests for {@link BeamRecordSize}. + */ +public class BeamRecordSizeTest { + + private static final BeamRecordSqlType RECORD_TYPE =3D BeamRecordSqlType= .builder() + .withTinyIntField("f_tinyint") + .withSmallIntField("f_smallint") + .withIntegerField("f_int") + .withBigIntField("f_bigint") + .withFloatField("f_float") + .withDoubleField("f_double") + .withDecimalField("f_decimal") + .withBooleanField("f_boolean") + .withTimeField("f_time") + .withDateField("f_date") + .withTimestampField("f_timestamp") + .withCharField("f_char") + .withVarcharField("f_varchar") + .build(); + + private static final List VALUES =3D ImmutableList.of( + (byte) 1, + (short) 2, + (int) 3, + (long) 4, + (float) 5.12, + (double) 6.32, + new BigDecimal(7), + false, + new GregorianCalendar(2019, 03, 02), + new Date(10L), + new Date(11L), + "12", + "13"); + + private static final long RECORD_SIZE =3D 91L; + + private static final BeamRecord RECORD =3D new BeamRecord(RECORD_TYPE, V= ALUES); + + @Rule public TestPipeline testPipeline =3D TestPipeline.create(); + @Rule public ExpectedException thrown =3D ExpectedException.none(); + + @Test + public void testCalculatesCorrectSize() throws Exception { + assertEquals(RECORD_SIZE, BeamRecordSize.of(RECORD).sizeInBytes()); + } + + @Test + public void testParDoConvertsToRecordSize() throws Exception { + PCollection records =3D testPipeline.apply( + TestStream.create(RECORD_TYPE.getRecordCoder()) + .addElements(RECORD) + .advanceWatermarkToInfinity()); + + PAssert + .that(records) + .satisfies(new CorrectSize()); + + testPipeline.run(); + } + + static class CorrectSize implements SerializableFunction, Void> { + @Override + public Void apply(Iterable input) { + BeamRecordSize recordSize =3D BeamRecordSize.of(Iterables.getOnlyEle= ment(input)); + assertThat(recordSize.sizeInBytes(), equalTo(RECORD_SIZE)); + return null; + } + } +} diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/mo= del/sql/ToBeamRecordTest.java b/sdks/java/nexmark/src/test/java/org/apache/= beam/sdk/nexmark/model/sql/ToBeamRecordTest.java new file mode 100644 index 00000000000..c13138d85cf --- /dev/null +++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql= /ToBeamRecordTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.model.sql; + +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.Person; +import org.apache.beam.sdk.nexmark.model.sql.adapter.ModelAdaptersMapping; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Unit tests for {@link ToBeamRecord}. + */ +public class ToBeamRecordTest { + private static final Person PERSON =3D + new Person(3L, "name", "email", "cc", "city", "state", 329823L, "ext= ra"); + + private static final Bid BID =3D + new Bid(5L, 3L, 123123L, 43234234L, "extra2"); + + private static final Auction AUCTION =3D + new Auction(5L, "item", "desc", 342L, 321L, 3423342L, 2349234L, 3L, = 1L, "extra3"); + + @Rule + public TestPipeline testPipeline =3D TestPipeline.create(); + @Rule + public ExpectedException thrown =3D ExpectedException.none(); + + @Test + public void testConvertsBids() throws Exception { + PCollection bids =3D testPipeline.apply( + TestStream.create(Event.CODER) + .addElements(new Event(BID)) + .advanceWatermarkToInfinity()); + + BeamRecord expectedBidRecord =3D + new BeamRecord( + ModelAdaptersMapping.ADAPTERS.get(Bid.class).getRecordType(), + ModelAdaptersMapping.ADAPTERS.get(Bid.class).getFieldsValues(B= ID)); + + PAssert + .that(bids.apply(ToBeamRecord.parDo())) + .containsInAnyOrder(expectedBidRecord); + + testPipeline.run(); + } + + @Test + public void testConvertsPeople() throws Exception { + PCollection people =3D testPipeline.apply( + TestStream.create(Event.CODER) + .addElements(new Event(PERSON)) + .advanceWatermarkToInfinity()); + + BeamRecord expectedPersonRecord =3D + new BeamRecord( + ModelAdaptersMapping.ADAPTERS.get(Person.class).getRecordType(= ), + ModelAdaptersMapping.ADAPTERS.get(Person.class).getFieldsValue= s(PERSON)); + + PAssert + .that(people.apply(ToBeamRecord.parDo())) + .containsInAnyOrder(expectedPersonRecord); + + testPipeline.run(); + } + + @Test + public void testConvertsAuctions() throws Exception { + PCollection auctions =3D testPipeline.apply( + TestStream.create(Event.CODER) + .addElements(new Event(AUCTION)) + .advanceWatermarkToInfinity()); + + BeamRecord expectedAuctionRecord =3D + new BeamRecord( + ModelAdaptersMapping.ADAPTERS.get(Auction.class).getRecordType= (), + ModelAdaptersMapping.ADAPTERS.get(Auction.class).getFieldsValu= es(AUCTION)); + + PAssert + .that(auctions.apply(ToBeamRecord.parDo())) + .containsInAnyOrder(expectedAuctionRecord); + + testPipeline.run(); + } +} diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/mo= del/sql/adapter/ModelAdaptersMappingTest.java b/sdks/java/nexmark/src/test/= java/org/apache/beam/sdk/nexmark/model/sql/adapter/ModelAdaptersMappingTest= .java new file mode 100644 index 00000000000..1eccfffc804 --- /dev/null +++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql= /adapter/ModelAdaptersMappingTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.model.sql.adapter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Person; +import org.junit.Test; + +/** + * Unit tests for {@link ModelAdaptersMapping}. + */ +public class ModelAdaptersMappingTest { + + private static final Person PERSON =3D + new Person(3L, "name", "email", "cc", "city", "state", 329823L, "ext= ra"); + + private static final BeamRecordSqlType PERSON_RECORD_TYPE =3D BeamRecord= SqlType.builder() + .withBigIntField("id") + .withVarcharField("name") + .withVarcharField("emailAddress") + .withVarcharField("creditCard") + .withVarcharField("city") + .withVarcharField("state") + .withBigIntField("dateTime") + .withVarcharField("extra") + .build(); + + private static final Bid BID =3D + new Bid(5L, 3L, 123123L, 43234234L, "extra2"); + + private static final BeamRecordSqlType BID_RECORD_TYPE =3D BeamRecordSql= Type.builder() + .withBigIntField("auction") + .withBigIntField("bidder") + .withBigIntField("price") + .withBigIntField("dateTime") + .withVarcharField("extra") + .build(); + + private static final Auction AUCTION =3D + new Auction(5L, "item", "desc", 342L, 321L, 3423342L, 2349234L, 3L, = 1L, "extra3"); + + private static final BeamRecordSqlType AUCTION_RECORD_TYPE =3D BeamRecor= dSqlType.builder() + .withBigIntField("id") + .withVarcharField("itemName") + .withVarcharField("description") + .withBigIntField("initialBid") + .withBigIntField("reserve") + .withBigIntField("dateTime") + .withBigIntField("expires") + .withBigIntField("seller") + .withBigIntField("category") + .withVarcharField("extra") + .build(); + + @Test + public void hasAdaptersForSupportedModels() throws Exception { + assertTrue(ModelAdaptersMapping.ADAPTERS.containsKey(Bid.class)); + assertTrue(ModelAdaptersMapping.ADAPTERS.containsKey(Person.class)); + assertTrue(ModelAdaptersMapping.ADAPTERS.containsKey(Auction.class)); + + assertNotNull(ModelAdaptersMapping.ADAPTERS.get(Bid.class)); + assertNotNull(ModelAdaptersMapping.ADAPTERS.get(Person.class)); + assertNotNull(ModelAdaptersMapping.ADAPTERS.get(Auction.class)); + } + + @Test + public void testBidAdapterRecordType() { + ModelFieldsAdapter adapter =3D ModelAdaptersMapping.ADAPTERS.g= et(Bid.class); + + BeamRecordSqlType bidRecordType =3D (BeamRecordSqlType) adapter.getRec= ordType(); + + assertEquals(BID_RECORD_TYPE.getFieldNames(), bidRecordType.getFieldNa= mes()); + assertEquals(BID_RECORD_TYPE.getFieldTypes(), bidRecordType.getFieldTy= pes()); + } + + @Test + public void testPersonAdapterRecordType() { + ModelFieldsAdapter adapter =3D ModelAdaptersMapping.ADAPTERS.g= et(Person.class); + + BeamRecordSqlType personRecordType =3D (BeamRecordSqlType) adapter.get= RecordType(); + + assertEquals(PERSON_RECORD_TYPE.getFieldNames(), personRecordType.getF= ieldNames()); + assertEquals(PERSON_RECORD_TYPE.getFieldTypes(), personRecordType.getF= ieldTypes()); + } + + @Test + public void testAuctionAdapterRecordType() { + ModelFieldsAdapter adapter =3D ModelAdaptersMapping.ADAPTERS.g= et(Auction.class); + + BeamRecordSqlType auctionRecordType =3D (BeamRecordSqlType) adapter.ge= tRecordType(); + + assertEquals(AUCTION_RECORD_TYPE.getFieldNames(), auctionRecordType.ge= tFieldNames()); + assertEquals(AUCTION_RECORD_TYPE.getFieldTypes(), auctionRecordType.ge= tFieldTypes()); + } + + @Test + public void testPersonAdapterGetsFieldValues() throws Exception { + ModelFieldsAdapter adapter =3D ModelAdaptersMapping.ADAPTERS.g= et(Person.class); + List values =3D adapter.getFieldsValues(PERSON); + assertEquals(PERSON.id, values.get(0)); + assertEquals(PERSON.name, values.get(1)); + assertEquals(PERSON.emailAddress, values.get(2)); + assertEquals(PERSON.creditCard, values.get(3)); + assertEquals(PERSON.city, values.get(4)); + assertEquals(PERSON.state, values.get(5)); + assertEquals(PERSON.dateTime, values.get(6)); + assertEquals(PERSON.extra, values.get(7)); + } + + @Test + public void testBidAdapterGetsFieldValues() throws Exception { + ModelFieldsAdapter adapter =3D ModelAdaptersMapping.ADAPTERS.get(= Bid.class); + List values =3D adapter.getFieldsValues(BID); + assertEquals(BID.auction, values.get(0)); + assertEquals(BID.bidder, values.get(1)); + assertEquals(BID.price, values.get(2)); + assertEquals(BID.dateTime, values.get(3)); + assertEquals(BID.extra, values.get(4)); + } + + @Test + public void testAuctionAdapterGetsFieldValues() throws Exception { + ModelFieldsAdapter adapter =3D ModelAdaptersMapping.ADAPTERS.= get(Auction.class); + List values =3D adapter.getFieldsValues(AUCTION); + assertEquals(AUCTION.id, values.get(0)); + assertEquals(AUCTION.itemName, values.get(1)); + assertEquals(AUCTION.description, values.get(2)); + assertEquals(AUCTION.initialBid, values.get(3)); + assertEquals(AUCTION.reserve, values.get(4)); + assertEquals(AUCTION.dateTime, values.get(5)); + assertEquals(AUCTION.expires, values.get(6)); + assertEquals(AUCTION.seller, values.get(7)); + assertEquals(AUCTION.category, values.get(8)); + assertEquals(AUCTION.extra, values.get(9)); + } +} diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/qu= eries/sql/SqlQuery0Test.java b/sdks/java/nexmark/src/test/java/org/apache/b= eam/sdk/nexmark/queries/sql/SqlQuery0Test.java new file mode 100644 index 00000000000..d5889d8903c --- /dev/null +++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/s= ql/SqlQuery0Test.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.queries.sql; + +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.sql.adapter.ModelAdaptersMapping; +import org.apache.beam.sdk.nexmark.model.sql.adapter.ModelFieldsAdapter; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; + +/** + * Unit tests for {@link SqlQuery0}. + */ +public class SqlQuery0Test { + + private static final Bid BID1 =3D + new Bid(5L, 3L, 123123L, 43234234L, "extra1"); + + private static final Bid BID2 =3D + new Bid(6L, 4L, 134123L, 13234234L, "extra2"); + + private static final ModelFieldsAdapter BID_ADAPTER =3D + ModelAdaptersMapping.ADAPTERS.get(Bid.class); + + private static final BeamRecord BID1_RECORD =3D + new BeamRecord(BID_ADAPTER.getRecordType(), BID_ADAPTER.getFieldsVal= ues(BID1)); + + private static final BeamRecord BID2_RECORD =3D + new BeamRecord(BID_ADAPTER.getRecordType(), BID_ADAPTER.getFieldsVal= ues(BID2)); + + @Rule + public TestPipeline testPipeline =3D TestPipeline.create(); + + @Test + public void testPassesBidsThrough() throws Exception { + PCollection bids =3D testPipeline.apply( + TestStream.create(Event.CODER) + .addElements(new Event(BID1)) + .addElements(new Event(BID2)) + .advanceWatermarkToInfinity()); + + PAssert + .that(bids.apply(new SqlQuery0())) + .containsInAnyOrder(BID1_RECORD, BID2_RECORD); + + testPipeline.run(); + } +} =20 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org > [Nexmark][SQL] Implement a basic pass-through SQL query > ------------------------------------------------------- > > Key: BEAM-3181 > URL: https://issues.apache.org/jira/browse/BEAM-3181 > Project: Beam > Issue Type: Sub-task > Components: dsl-sql > Reporter: Anton Kedin > Assignee: Anton Kedin > > Implement a basic query to make sure some scaffolding is in place, like c= onversion between Java models to BeamRecords and then to KnownSize. -- This message was sent by Atlassian JIRA (v6.4.14#64029)