From commits-return-85959-archive-asf-public=cust-asf.ponee.io@beam.apache.org Tue Jul 31 21:14:13 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id CF99A180662 for ; Tue, 31 Jul 2018 21:14:11 +0200 (CEST) Received: (qmail 30469 invoked by uid 500); 31 Jul 2018 19:14:10 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 30455 invoked by uid 99); 31 Jul 2018 19:14:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Jul 2018 19:14:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 653F31A1C05 for ; Tue, 31 Jul 2018 19:14:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id sTmTTH0hlcEe for ; Tue, 31 Jul 2018 19:14:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 9F3D05F4DA for ; Tue, 31 Jul 2018 19:14:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id F3CF8E0E33 for ; Tue, 31 Jul 2018 19:14:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 4EBAC27769 for ; Tue, 31 Jul 2018 19:14:00 +0000 (UTC) Date: Tue, 31 Jul 2018 19:14:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: commits@beam.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Work logged] (BEAM-2661) Add KuduIO MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/BEAM-2661?focusedWorklogId=3D1= 29456&page=3Dcom.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpan= el#worklog-129456 ] ASF GitHub Bot logged work on BEAM-2661: ---------------------------------------- Author: ASF GitHub Bot Created on: 31/Jul/18 19:13 Start Date: 31/Jul/18 19:13 Worklog Time Spent: 10m=20 Work Description: reuvenlax closed pull request #6021: [BEAM-2661] Ad= ds KuduIO URL: https://github.com/apache/beam/pull/6021 =20 =20 =20 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/kudu/build.gradle b/sdks/java/io/kudu/build.gradl= e new file mode 100644 index 00000000000..5457ec7dfdd --- /dev/null +++ b/sdks/java/io/kudu/build.gradle @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: org.apache.beam.gradle.BeamModulePlugin +applyJavaNature() +provideIntegrationTestingDependencies() +enableJavaPerformanceTesting() + +description =3D "Apache Beam :: SDKs :: Java :: IO :: Kudu" +ext.summary =3D "Library to read and write from/to Kudu" + +test { + systemProperty "log4j.configuration", "log4j-test.properties" + jvmArgs "-XX:-UseGCOverheadLimit" + if (System.getProperty("beamSurefireArgline")) { + jvmArgs System.getProperty("beamSurefireArgline") + } +} + +def kudu_version =3D "1.4.0" + +dependencies { + compile library.java.guava + shadow project(path: ":beam-sdks-java-core", configuration: "shadow") + shadow "org.apache.kudu:kudu-client:$kudu_version" + shadow library.java.slf4j_api + testCompile project(path: ":beam-runners-direct-java", configuration: "s= hadow") + testCompile project(path: ":beam-sdks-java-core", configuration: "shadow= Test") + testCompile project(path: ":beam-sdks-java-io-common", configuration: "s= hadow") + testCompile project(path: ":beam-sdks-java-io-common", configuration: "s= hadowTest") + testCompile library.java.hamcrest_core + testCompile library.java.hamcrest_library + testCompile library.java.junit +} + diff --git a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/Ku= duIO.java b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/Kud= uIO.java new file mode 100644 index 00000000000..5694946d3dd --- /dev/null +++ b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.ja= va @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kudu; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Splitter; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.kudu.Common; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduPredicate; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.RowResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A bounded source and sink for Kudu. + * + *

For more information, see the online documentation at Kudu. + * + *

Reading from Kudu

+ * + *

{@code KuduIO} provides a source to read and returns a bounded colle= ction of entities as + * {@code PCollection<T>}. An entity is built by parsing a Kudu {@li= nk RowResult} using the + * provided {@link SerializableFunction<RowResult, T>}. + * + *

The following example illustrates various options for configuring th= e IO: + * + *

{@code
+ * pipeline.apply(
+ *     KuduIO.read()
+ *         .withMasterAddresses("kudu1:8051,kudu2:8051,kudu3:8051")
+ *         .withTable("table")
+ *         .withParseFn(
+ *             (SerializableFunction) input -> input.ge=
tString(COL_NAME))
+ *         .withCoder(StringUtf8Coder.of()));
+ *     // above options illustrate a typical minimum set, returns PCollect=
ion
+ * }
+ * + *

{@code withCoder(...)} may be omitted if it can be inferred from the= @{CoderRegistry}. + * However, when using a Lambda Expression or an anonymous inner class to = define the function, type + * erasure will prohibit this. In such cases you are required to explicitl= y set the coder as in the + * above example. + * + *

Optionally, you can provide {@code withPredicates(...)} to apply a q= uery to filter rows from + * the kudu table. + * + *

Optionally, you can provide {@code withProjectedColumns(...)} to lim= it the columns returned + * from the Kudu scan to improve performance. The columns required in the = {@code ParseFn} must be + * declared in the projected columns. + * + *

Optionally, you can provide {@code withBatchSize(...)} to set the nu= mber of bytes returned + * from the Kudu scanner in each batch. + * + *

Optionally, you can provide {@code withFaultTolerent(...)} to enforc= e the read scan to resume + * a scan on another tablet server if the current server fails. + * + *

Writing to Kudu

+ * + *

The Kudu sink executes a set of operations on a single table. It tak= es as input a {@link + * PCollection PCollection<T>} and a {@link FormatFunction<T>}= which is responsible for + * converting the input into an idempotent transformation on a row. + * + *

To configure a Kudu sink, you must supply the Kudu master addresses,= the table name and a + * {@link FormatFunction} to convert the input records, for example: + * + *

{@code
+ * PCollection data =3D ...;
+ * FormatFunction fn =3D ...;
+ *
+ * data.apply("write",
+ *     KuduIO.write()
+ *         .withMasterAddresses("kudu1:8051,kudu2:8051,kudu3:8051")
+ *         .withTable("table")
+ *         .withFormatFn(fn));
+ * }
+ * + *

Experimental

+ * + * {@code KuduIO} does not support authentication in this release. + */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public class KuduIO { + private static final Logger LOG =3D LoggerFactory.getLogger(KuduIO.class= ); + + private KuduIO() {} + + public static Read read() { + return new AutoValue_KuduIO_Read.Builder().setKuduService(new KuduS= erviceImpl<>()).build(); + } + + public static Write write() { + return new AutoValue_KuduIO_Write.Builder().setKuduService(new Kudu= ServiceImpl<>()).build(); + } + + /** + * An interface used by the KuduIO Write to convert an input record into= an Operation to apply as + * a mutation in Kudu. + */ + @FunctionalInterface + public interface FormatFunction extends SerializableFunction, Operation> {} + + /** Implementation of {@link KuduIO#read()}. */ + @AutoValue + public abstract static class Read extends PTransform> { + @Nullable + abstract List getMasterAddresses(); + + @Nullable + abstract String getTable(); + + @Nullable + abstract Integer getBatchSize(); + + @Nullable + abstract List getProjectedColumns(); + + @Nullable + abstract List getSerializablePredicates(); + + @Nullable + abstract Boolean getFaultTolerent(); + + @Nullable + abstract SerializableFunction getParseFn(); + + @Nullable + abstract Coder getCoder(); + + @Nullable + abstract KuduService getKuduService(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setMasterAddresses(List masterAddresses)= ; + + abstract Builder setTable(String table); + + abstract Builder setBatchSize(Integer batchSize); + + abstract Builder setProjectedColumns(List projectedColumn= s); + + abstract Builder setSerializablePredicates( + List serializablePredicates); + + abstract Builder setFaultTolerent(Boolean faultTolerent); + + abstract Builder setParseFn(SerializableFunction pa= rseFn); + + abstract Builder setCoder(Coder coder); + + abstract Builder setKuduService(KuduService kuduService); + + abstract Read build(); + } + + @VisibleForTesting + Coder inferCoder(CoderRegistry coderRegistry) { + try { + return getCoder() !=3D null + ? getCoder() + : coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn()= )); + } catch (CannotProvideCoderException e) { + throw new IllegalArgumentException( + "Unable to infer coder for output of parseFn (" + + TypeDescriptors.outputOf(getParseFn()) + + "). Specify it explicitly using withCoder().", + e); + } + } + + /** Reads from the Kudu cluster on the specified master addresses. */ + public Read withMasterAddresses(String masterAddresses) { + checkArgument(masterAddresses !=3D null, "masterAddresses cannot be = null or empty"); + return builder().setMasterAddresses(Splitter.on(",").splitToList(mas= terAddresses)).build(); + } + + /** Reads from the specified table. */ + public Read withTable(String table) { + checkArgument(table !=3D null, "table cannot be null"); + return builder().setTable(table).build(); + } + + /** Provides the function to parse a row from Kudu into the typed obje= ct. */ + public Read withParseFn(SerializableFunction parseFn)= { + checkArgument(parseFn !=3D null, "parseFn cannot be null"); + return builder().setParseFn(parseFn).build(); + } + + /** Filters the rows read from Kudu using the given predicates. */ + public Read withPredicates(List predicates) { + checkArgument(predicates !=3D null, "predicates cannot be null"); + // reuse the kudu protobuf serialization mechanism + List serializablePredicates =3D + predicates.stream().map(KuduPredicate::toPB).collect(Collectors.= toList()); + return builder().setSerializablePredicates(serializablePredicates).b= uild(); + } + + /** Filters the columns read from the table to include only those spec= ified. */ + public Read withProjectedColumns(List projectedColumns) { + checkArgument(projectedColumns !=3D null, "projectedColumns cannot b= e null"); + return builder().setProjectedColumns(projectedColumns).build(); + } + + /** Reads from the table in batches of the specified size. */ + public Read withBatchSize(int batchSize) { + checkArgument(batchSize >=3D 0, "batchSize must not be negative"); + return builder().setBatchSize(batchSize).build(); + } + + /** + * Instructs the read scan to resume a scan on another tablet server i= f the current server fails + * and faultTolerant is set to true. + */ + public Read withFaultTolerent(boolean faultTolerent) { + return builder().setFaultTolerent(faultTolerent).build(); + } + + /** + * Sets a {@link Coder} for the result of the parse function. This may= be required if a coder + * can not be inferred automatically. + */ + public Read withCoder(Coder coder) { + checkArgument(coder !=3D null, "coder cannot be null"); + return builder().setCoder(coder).build(); + } + + /** Specify an instance of {@link KuduService} used to connect and rea= d from Kudu. */ + @VisibleForTesting + Read withKuduService(KuduService kuduService) { + checkArgument(kuduService !=3D null, "kuduService cannot be null"); + return builder().setKuduService(kuduService).build(); + } + + @Override + public PCollection expand(PBegin input) { + Pipeline p =3D input.getPipeline(); + final Coder coder =3D inferCoder(p.getCoderRegistry()); + return input.apply(org.apache.beam.sdk.io.Read.from(new KuduSource<>= (this, coder, null))); + } + + @Override + public void validate(PipelineOptions pipelineOptions) { + checkState( + getMasterAddresses() !=3D null, + "KuduIO.read() requires a list of master addresses to be set via= withMasterAddresses(masterAddresses)"); + checkState( + getTable() !=3D null, + "KuduIO.read() requires a table name to be set via withTableName= (tableName)"); + checkState( + getParseFn() !=3D null, + "KuduIO.read() requires a parse function to be set via withParse= Fn(parseFn)"); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("masterAddresses", getMasterAddresses()= .toString())); + builder.add(DisplayData.item("table", getTable())); + } + } + + static class KuduSource extends BoundedSource { + final Read spec; + private final Coder coder; + @Nullable byte[] serializedToken; // only during a split + + KuduSource(Read spec, Coder coder, byte[] serializedToken) { + this.spec =3D spec; + this.coder =3D coder; + this.serializedToken =3D serializedToken; + } + + // A Kudu source can be split once only providing a source per tablet + @Override + public List> split(long desiredBundleSizeBytes, Pipel= ineOptions options) + throws KuduException { + if (serializedToken !=3D null) { + return Collections.singletonList(this); // we are already a split + + } else { + Stream> sources =3D + spec.getKuduService() + .createTabletScanners(spec) + .stream() + .map(s -> new KuduIO.KuduSource(spec, spec.getCoder(), = s)); + return sources.collect(Collectors.toList()); + } + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) { + return 0; // Kudu does not expose tablet sizes + } + + @Override + public BoundedReader createReader(PipelineOptions options) { + return spec.getKuduService().createReader(this); + } + + @Override + public Coder getOutputCoder() { + return coder; + } + } + + /** + * A {@link PTransform} that writes to Kudu. See the class-level Javadoc= on {@link KuduIO} for + * more information. + * + * @see KuduIO + */ + @AutoValue + public abstract static class Write extends PTransform,= PDone> { + @Nullable + abstract List masterAddresses(); + + @Nullable + abstract String table(); + + @Nullable + abstract FormatFunction formatFn(); + + @Nullable + abstract KuduService kuduService(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setMasterAddresses(List masterAddresses)= ; + + abstract Builder setTable(String table); + + abstract Builder setFormatFn(FormatFunction formatFn); + + abstract Builder setKuduService(KuduService kuduService); + + abstract Write build(); + } + + /** Writes to the Kudu cluster on the specified master addresses. */ + public Write withMasterAddresses(String masterAddresses) { + checkArgument(masterAddresses !=3D null, "masterAddresses cannot be = null or empty"); + return builder().setMasterAddresses(Splitter.on(",").splitToList(mas= terAddresses)).build(); + } + + /** Writes to the specified table. */ + public Write withTable(String table) { + checkArgument(table !=3D null, "table cannot be null"); + return builder().setTable(table).build(); + } + + /** Writes using the given function to create the mutation operations = from the input. */ + public Write withFormatFn(FormatFunction formatFn) { + checkArgument(formatFn !=3D null, "formatFn cannot be null"); + return builder().setFormatFn(formatFn).build(); + } + + /** Specify the {@link KuduService} used to connect and write into the= Kudu table. */ + @VisibleForTesting + Write withKuduService(KuduService kuduService) { + checkArgument(kuduService !=3D null, "kuduService cannot be null"); + return builder().setKuduService(kuduService).build(); + } + + @Override + public PDone expand(PCollection input) { + input.apply(ParDo.of(new WriteFn(this))); + return PDone.in(input.getPipeline()); + } + + @Override + public void validate(PipelineOptions pipelineOptions) { + checkState( + masterAddresses() !=3D null, + "KuduIO.write() requires a list of master addresses to be set vi= a withMasterAddresses(masterAddresses)"); + checkState( + table() !=3D null, "KuduIO.write() requires a table name to be s= et via withTable(table)"); + checkState( + formatFn() !=3D null, + "KuduIO.write() requires a format function to be set via withFor= matFn(formatFn)"); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("masterAddresses", masterAddresses().to= String())); + builder.add(DisplayData.item("tableName", table())); + builder.add(DisplayData.item("formatFn", formatFn().getClass().getCa= nonicalName())); + } + + private class WriteFn extends DoFn { + private final Write spec; + private KuduService.Writer writer; + + WriteFn(Write spec) { + this.spec =3D spec; + } + + @Setup + public void setup() throws KuduException { + writer =3D spec.kuduService().createWriter(spec); + } + + @StartBundle + public void startBundle(StartBundleContext context) throws KuduExcep= tion { + writer.openSession(); + } + + @ProcessElement + public void processElement(ProcessContext c) throws KuduException { + writer.write(c.element()); + } + + @FinishBundle + public void finishBundle() throws Exception { + writer.closeSession(); + } + + @Teardown + public void teardown() throws Exception { + writer.close(); + writer =3D null; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("masterAddresses", spec.masterAddress= es().toString())); + builder.add(DisplayData.item("table", spec.table())); + } + } + } +} diff --git a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/Ku= duService.java b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kud= u/KuduService.java new file mode 100644 index 00000000000..7b14e95de6b --- /dev/null +++ b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduServi= ce.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kudu; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.kudu.client.KuduException; + +/** An interface for real, mock, or fake implementations of Kudu services.= */ +interface KuduService extends Serializable { + + /** + * Returns a {@link org.apache.beam.sdk.io.BoundedSource.BoundedReader} = that will read from Kudu + * using the spec from {@link org.apache.beam.sdk.io.kudu.KuduIO.KuduSou= rce}. + */ + BoundedSource.BoundedReader createReader(KuduIO.KuduSource source)= ; + + /** Create a {@link Writer} that writes entities into the KKudu instance= . */ + Writer createWriter(KuduIO.Write spec) throws KuduException; + + /** Returns a list containing a serialized scanner per tablet. */ + List createTabletScanners(KuduIO.Read spec) throws KuduExcept= ion; + + /** Writer for an entity. */ + interface Writer extends AutoCloseable, Serializable { + + /** + * Opens a new session for writing. This must be called exactly once b= efore calling {@link + * #write(Object)}. + */ + void openSession() throws KuduException; + + /** + * Writes the entity to Kudu. A call to {@link #openSession()} must be= made before writing. + * Writes may be asynchronous in which case implementations must surfa= ce errors when the session + * is closed. + */ + void write(T entity) throws KuduException; + + /** Closes the session, surfacing any errors that may have occurred du= ring writing. */ + void closeSession() throws Exception; + } +} diff --git a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/Ku= duServiceImpl.java b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io= /kudu/KuduServiceImpl.java new file mode 100644 index 00000000000..12dc3b12080 --- /dev/null +++ b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduServi= ceImpl.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kudu; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.Callable; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.kudu.Common; +import org.apache.kudu.Schema; +import org.apache.kudu.client.AbstractKuduScannerBuilder; +import org.apache.kudu.client.AsyncKuduClient; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduPredicate; +import org.apache.kudu.client.KuduScanToken; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.RowError; +import org.apache.kudu.client.RowResult; +import org.apache.kudu.client.RowResultIterator; +import org.apache.kudu.client.SessionConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** An implementation of the {@link KuduService} that uses a Kudu instance= . */ +class KuduServiceImpl implements KuduService { + private static final Logger LOG =3D LoggerFactory.getLogger(KuduServiceI= mpl.class); + + @Override + public Writer createWriter(KuduIO.Write spec) throws KuduException { + return new WriterImpl(spec); + } + + @Override + public BoundedSource.BoundedReader createReader(KuduIO.KuduSource source= ) { + return new ReaderImpl(source); + } + + @Override + public List createTabletScanners(KuduIO.Read spec) throws KuduEx= ception { + try (KuduClient client =3D getKuduClient(spec.getMasterAddresses())) { + KuduTable table =3D client.openTable(spec.getTable()); + KuduScanToken.KuduScanTokenBuilder builder =3D client.newScanTokenBu= ilder(table); + configureBuilder(spec, table.getSchema(), builder); + List tokens =3D builder.build(); + return tokens.stream().map(t -> uncheckCall(t::serialize)).collect(C= ollectors.toList()); + } + } + + /** Writer storing an entity into Apache Kudu table. */ + class WriterImpl implements Writer { + private final KuduIO.FormatFunction formatFunction; + private KuduClient client; + private KuduSession session; + private KuduTable table; + + WriterImpl(KuduIO.Write spec) throws KuduException { + checkNotNull(spec.masterAddresses(), "masterAddresses cannot be null= "); + checkNotNull(spec.table(), "table cannot be null"); + this.formatFunction =3D checkNotNull(spec.formatFn(), "formatFn cann= ot be null"); + client =3D + new AsyncKuduClient.AsyncKuduClientBuilder(spec.masterAddresses(= )).build().syncClient(); + table =3D client.openTable(spec.table()); + } + + @Override + public void openSession() throws KuduException { + // errors are collected per session so we align session with the bun= dle + session =3D client.newSession(); + // async flushing as per the official kudu-spark approach + session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKG= ROUND); + } + + @Override + public void write(T entity) throws KuduException { + checkState(session !=3D null, "must call openSession() before writin= g"); + session.apply(formatFunction.apply(new TableAndRecord(table, entity)= )); + } + + @Override + public void closeSession() throws Exception { + try { + session.close(); + if (session.countPendingErrors() > 0) { + LOG.error("At least {} errors occurred writing to Kudu", session= .countPendingErrors()); + RowError[] errors =3D session.getPendingErrors().getRowErrors(); + for (int i =3D 0; errors !=3D null && i < 3 && i < errors.length= ; i++) { + LOG.error("Sample error: {}", errors[i]); + } + throw new Exception( + "At least " + session.countPendingErrors() + " error(s) occu= rred writing to Kudu"); + } + } finally { + session =3D null; + } + } + + @Override + public void close() throws Exception { + client.close(); + client =3D null; + } + } + + /** Bounded reader of an Apache Kudu table. */ + class ReaderImpl extends BoundedSource.BoundedReader { + private final KuduIO.KuduSource source; + private KuduClient client; + private KuduScanner scanner; + private RowResultIterator iter; + private RowResult current; + private long recordsReturned; + + ReaderImpl(KuduIO.KuduSource source) { + this.source =3D source; + } + + @Override + public boolean start() throws IOException { + LOG.debug("Starting Kudu reader"); + client =3D + new AsyncKuduClient.AsyncKuduClientBuilder(source.spec.getMaster= Addresses()) + .build() + .syncClient(); + + if (source.serializedToken !=3D null) { + // tokens available if the source is already split + scanner =3D KuduScanToken.deserializeIntoScanner(source.serialized= Token, client); + } else { + KuduTable table =3D client.openTable(source.spec.getTable()); + KuduScanner.KuduScannerBuilder builder =3D + table.getAsyncClient().syncClient().newScannerBuilder(table); + + configureBuilder(source.spec, table.getSchema(), builder); + scanner =3D builder.build(); + } + + return advance(); + } + + /** + * Returns the current record transformed into the desired type. + * + * @return the current record + * @throws NoSuchElementException If the current does not exist + */ + @Override + public T getCurrent() throws NoSuchElementException { + if (current !=3D null) { + return source.spec.getParseFn().apply(current); + + } else { + throw new NoSuchElementException( + "No current record (Indicates misuse. Perhaps advance() was no= t called?)"); + } + } + + @Override + public boolean advance() throws KuduException { + // scanner pages over results, with each page holding an iterator of= records + if (iter =3D=3D null || (!iter.hasNext() && scanner.hasMoreRows())) = { + iter =3D scanner.nextRows(); + } + + if (iter !=3D null && iter.hasNext()) { + current =3D iter.next(); + ++recordsReturned; + return true; + } + + return false; + } + + @Override + public void close() throws IOException { + LOG.debug("Closing reader after reading {} records.", recordsReturne= d); + if (scanner !=3D null) { + scanner.close(); + scanner =3D null; + } + if (client !=3D null) { + client.close(); + client =3D null; + } + } + + @Override + public synchronized KuduIO.KuduSource getCurrentSource() { + return source; + } + } + + /** Creates a new synchronous client. */ + private synchronized KuduClient getKuduClient(List masterAddress= es) { + return new AsyncKuduClient.AsyncKuduClientBuilder(masterAddresses).bui= ld().syncClient(); + } + + /** Configures the scanner builder to conform to the spec. */ + private static void configureBuilder( + KuduIO.Read spec, Schema schema, AbstractKuduScannerBuilder buil= der) { + builder.cacheBlocks(true); // as per kudu-spark + if (spec.getBatchSize() !=3D null) { + builder.batchSizeBytes(spec.getBatchSize()); + } + if (spec.getProjectedColumns() !=3D null) { + builder.setProjectedColumnNames(spec.getProjectedColumns()); + } + if (spec.getFaultTolerent() !=3D null) { + builder.setFaultTolerant(spec.getFaultTolerent()); + } + if (spec.getSerializablePredicates() !=3D null) { + for (Common.ColumnPredicatePB predicate : spec.getSerializablePredic= ates()) { + builder.addPredicate(KuduPredicate.fromPB(schema, predicate)); + } + } + } + + /** Wraps the callable converting checked to RuntimeExceptions. */ + private static T uncheckCall(Callable callable) { + try { + return callable.call(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/Ta= bleAndRecord.java b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/= kudu/TableAndRecord.java new file mode 100644 index 00000000000..06db175d082 --- /dev/null +++ b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/TableAndR= ecord.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kudu; + +import org.apache.kudu.client.KuduTable; + +/** + * A wrapper for a {@link KuduTable} and the {@link T} representing a type= d record. + * + * @param The type of the record + */ +public class TableAndRecord { + private final KuduTable table; + private final T record; + + public TableAndRecord(KuduTable table, T record) { + this.table =3D table; + this.record =3D record; + } + + public KuduTable getTable() { + return table; + } + + public T getRecord() { + return record; + } +} diff --git a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/pa= ckage-info.java b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/ku= du/package-info.java new file mode 100644 index 00000000000..cece7e2efdf --- /dev/null +++ b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/package-i= nfo.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transforms for reading and writing from/to Apache Kudu. + * + * @see org.apache.beam.sdk.io.kudu.KuduIO + */ +package org.apache.beam.sdk.io.kudu; diff --git a/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/Ku= duIOIT.java b/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/K= uduIOIT.java new file mode 100644 index 00000000000..ba2e9d6807b --- /dev/null +++ b/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduIOIT.= java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kudu; + +import static org.apache.beam.sdk.io.kudu.KuduTestUtils.COL_ID; +import static org.apache.beam.sdk.io.kudu.KuduTestUtils.COL_NAME; +import static org.apache.beam.sdk.io.kudu.KuduTestUtils.GenerateUpsert; +import static org.apache.beam.sdk.io.kudu.KuduTestUtils.SCHEMA; +import static org.apache.beam.sdk.io.kudu.KuduTestUtils.createTableOptions= ; +import static org.apache.beam.sdk.io.kudu.KuduTestUtils.rowCount; +import static org.hamcrest.Matchers.equalTo; + +import java.util.Arrays; +import java.util.Collections; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.kudu.client.AsyncKuduClient; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduPredicate; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.RowResult; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A test of {@link org.apache.beam.sdk.io.kudu.KuduIO} on an independent = Kudu instance. + * + *

This test requires a running instance of Kudu. Pass in connection in= formation using + * PipelineOptions: + * + *

+ *  ./gradlew integrationTest -p sdks/java/io/Kudu -DintegrationTestPipeli=
neOptions=3D'[
+ *    "--kuduMasterAddresses=3D127.0.0.1",
+ *    "--kuduTable=3Dbeam-integration-test",
+ *    "--numberOfRecords=3D100000" ]'
+ *    --tests org.apache.beam.sdk.io.kudu.KuduIOIT
+ *    -DintegrationTestRunner=3Ddirect
+ * 
+ * + *

To start a Kudu server in docker you can use the following: + * + *

+ *   docker pull usuresearch/apache-kudu docker run -d --rm --name apache-=
kudu -p 7051:7051 \
+ *     -p 7050:7050 -p 8051:8051 -p 8050:8050 usuresearch/apache-kudu ```
+ * 
+ * + *

See fo= r information about this + * image. + * + *

Once running you may need to visit the masters + * list and copy the host (e.g. host: "e94929167e2a") add= ing it to your + * etc/hosts file pointing to localhost e.g.: + * + *

+ *   127.0.0.1 localhost e94929167e2a
+ * 
+ */ +public class KuduIOIT { + private static final Logger LOG =3D LoggerFactory.getLogger(KuduIOIT.cla= ss); + + /** KuduIOIT options. */ + public interface KuduPipelineOptions extends IOTestPipelineOptions { + @Description("Kudu master addresses (comma separated address list)") + @Default.String("127.0.0.1:7051") + String getKuduMasterAddresses(); + + void setKuduMasterAddresses(String masterAddresses); + + @Description("Kudu table") + @Default.String("beam-integration-test") + String getKuduTable(); + + void setKuduTable(String name); + } + + private static KuduPipelineOptions options; + private static KuduClient client; + private static KuduTable kuduTable; + + @Rule public final TestPipeline writePipeline =3D TestPipeline.create(); + @Rule public TestPipeline readPipeline =3D TestPipeline.create(); + @Rule public ExpectedException thrown =3D ExpectedException.none(); + + @BeforeClass + public static void setUp() throws KuduException { + PipelineOptionsFactory.register(KuduPipelineOptions.class); + options =3D TestPipeline.testingPipelineOptions().as(KuduPipelineOptio= ns.class); + + // synchronous operations + client =3D + new AsyncKuduClient.AsyncKuduClientBuilder(options.getKuduMasterAd= dresses()) + .build() + .syncClient(); + + if (client.tableExists(options.getKuduTable())) { + client.deleteTable(options.getKuduTable()); + } + + kuduTable =3D + client.createTable(options.getKuduTable(), KuduTestUtils.SCHEMA, c= reateTableOptions()); + } + + @AfterClass + public static void tearDown() throws Exception { + try { + if (client.tableExists(options.getKuduTable())) { + client.deleteTable(options.getKuduTable()); + } + } finally { + client.close(); + } + } + + @Test + public void testWriteThenRead() throws Exception { + runWrite(); + runReadAll(); + readPipeline =3D TestPipeline.create(); + runReadProjectedColumns(); + readPipeline =3D TestPipeline.create(); + runReadWithPredicates(); + } + + private void runReadAll() { + // Lambdas erase too much type information so specify the coder + PCollection output =3D + readPipeline.apply( + KuduIO.read() + .withMasterAddresses(options.getKuduMasterAddresses()) + .withTable(options.getKuduTable()) + .withParseFn( + (SerializableFunction) input -> inp= ut.getString(COL_NAME)) + .withCoder(StringUtf8Coder.of())); + PAssert.thatSingleton(output.apply("Count", Count.globally())) + .isEqualTo((long) options.getNumberOfRecords()); + + readPipeline.run().waitUntilFinish(); + } + + private void runReadWithPredicates() { + PCollection output =3D + readPipeline.apply( + "Read with predicates", + KuduIO.read() + .withMasterAddresses(options.getKuduMasterAddresses()) + .withTable(options.getKuduTable()) + .withParseFn( + (SerializableFunction) input -> inp= ut.getString(COL_NAME)) + .withPredicates( + Arrays.asList( + KuduPredicate.newComparisonPredicate( + SCHEMA.getColumn(COL_ID), KuduPredicate.Compar= isonOp.GREATER_EQUAL, 2), + KuduPredicate.newComparisonPredicate( + SCHEMA.getColumn(COL_ID), KuduPredicate.Compar= isonOp.LESS, 7))) + .withCoder(StringUtf8Coder.of())); + + output.apply(Count.globally()); + + PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqual= To((long) 5); + + readPipeline.run().waitUntilFinish(); + } + + /** + * Tests that the projected columns are passed down to the Kudu scanner = by attempting to read the + * {@value KuduTestUtils#COL_NAME} in the parse function when it is omit= ted. + */ + private void runReadProjectedColumns() { + thrown.expect(IllegalArgumentException.class); + readPipeline + .apply( + "Read with projected columns", + KuduIO.read() + .withMasterAddresses(options.getKuduMasterAddresses()) + .withTable(options.getKuduTable()) + .withParseFn( + (SerializableFunction) input -> inp= ut.getString(COL_NAME)) + .withProjectedColumns(Collections.singletonList(COL_ID))) = // COL_NAME excluded + .setCoder(StringUtf8Coder.of()); + readPipeline.run().waitUntilFinish(); + } + + private void runWrite() throws Exception { + writePipeline + .apply("Generate sequence", GenerateSequence.from(0).to(options.ge= tNumberOfRecords())) + .apply( + "Write records to Kudu", + KuduIO.write() + .withMasterAddresses(options.getKuduMasterAddresses()) + .withTable(options.getKuduTable()) + .withFormatFn(new GenerateUpsert())); + writePipeline.run().waitUntilFinish(); + + Assert.assertThat( + "Wrong number of records in table", + rowCount(kuduTable), + equalTo(options.getNumberOfRecords())); + } +} diff --git a/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/Ku= duIOTest.java b/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu= /KuduIOTest.java new file mode 100644 index 00000000000..a39b87b4e22 --- /dev/null +++ b/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduIOTes= t.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kudu; + +import static org.apache.beam.sdk.io.kudu.KuduTestUtils.COL_ID; +import static org.apache.beam.sdk.io.kudu.KuduTestUtils.GenerateUpsert; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.testing.ExpectedLogs; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.RowResult; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A test of {@link KuduIO} using fake Kudu services. + * + *

Since Kudu is written in C++ it does not currently lend itself to ea= sy unit tests from a Java + * environment. The Kudu project is actively working on a solution for thi= s (see KUDU-2411)= which will be used in the + * future. In the meantime, only rudimentary tests exist here, with the pr= eferred testing being + * carried out in {@link KuduIOIT}. + */ +public class KuduIOTest { + private static final Logger LOG =3D LoggerFactory.getLogger(KuduIOTest.c= lass); + + @Rule public final TestPipeline writePipeline =3D TestPipeline.create(); + @Rule public final TestPipeline readPipeline =3D TestPipeline.create(); + @Rule public ExpectedException thrown =3D ExpectedException.none(); + + @Rule public final transient ExpectedLogs expectedWriteLogs =3D Expected= Logs.none(FakeWriter.class); + @Rule public final transient ExpectedLogs expectedReadLogs =3D ExpectedL= ogs.none(FakeReader.class); + + private KuduService mockReadService; + private KuduService mockWriteService; + + private final int numberRecords =3D 10; + private int targetParallelism =3D 3; // determined by the runner, but di= rect has min of 3 + + @Before + public void setUp() throws Exception { + mockReadService =3D mock(KuduService.class, withSettings().serializabl= e()); + mockWriteService =3D mock(KuduService.class, withSettings().serializab= le()); + } + + /** + * Tests the read path using a {@link FakeReader}. The {@link KuduServic= e} is mocked to simulate 4 + * tablets and fake the encoding of a scanner for each tablet. The test = verifies that the {@link + * KuduIO} correctly splits into 4 sources and instantiates a reader for= each, and that the + * correct number of records are read. + */ + @Test + public void testRead() throws KuduException { + when(mockReadService.createReader(any())).thenAnswer(new FakeReaderAns= wer()); + // Simulate the equivalent of Kudu providing an encoded scanner per ta= blet. Here we encode + // a range which the fake reader will use to simulate a single tablet = read. + List fakeScanners =3D + Arrays.asList( + ByteBuffer.allocate(8).putInt(0).putInt(25).array(), + ByteBuffer.allocate(8).putInt(25).putInt(50).array(), + ByteBuffer.allocate(8).putInt(50).putInt(75).array(), + ByteBuffer.allocate(8).putInt(75).putInt(100).array()); + when(mockReadService.createTabletScanners(any())).thenReturn(fakeScann= ers); + + PCollection output =3D + readPipeline.apply( + KuduIO.read() + .withMasterAddresses("mock") + .withTable("Table") + // the fake reader only deals with a single int + .withParseFn( + (SerializableFunction) input -> in= put.getInt(COL_ID)) + .withKuduService(mockReadService) + .withCoder(BigEndianIntegerCoder.of())); + + PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqual= To((long) 100); + + readPipeline.run().waitUntilFinish(); + + // check that the fake tablet ranges were read + expectedReadLogs.verifyDebug(String.format(FakeReader.LOG_SET_RANGE, 0= , 25)); + expectedReadLogs.verifyDebug(String.format(FakeReader.LOG_SET_RANGE, 2= 5, 50)); + expectedReadLogs.verifyDebug(String.format(FakeReader.LOG_SET_RANGE, 5= 0, 75)); + expectedReadLogs.verifyDebug(String.format(FakeReader.LOG_SET_RANGE, 7= 5, 100)); + } + + /** + * Test the write path using a {@link FakeWriter} and verifying the expe= cted log statements are + * written. This test ensures that the {@link KuduIO} correctly respects= parallelism by + * deserializes writers and that each writer is opening and closing Kudu= sessions. + */ + @Test + public void testWrite() throws Exception { + when(mockWriteService.createWriter(any())).thenReturn(new FakeWriter()= ); + + writePipeline + .apply("Generate sequence", GenerateSequence.from(0).to(numberReco= rds)) + .apply( + "Write records to Kudu", + KuduIO.write() + .withMasterAddresses("ignored") + .withTable("ignored") + .withFormatFn(new GenerateUpsert()) // ignored (mocking Op= eration is pointless) + .withKuduService(mockWriteService)); + writePipeline.run().waitUntilFinish(); + + for (int i =3D 1; i <=3D targetParallelism + 1; i++) { + expectedWriteLogs.verifyDebug(String.format(FakeWriter.LOG_OPEN_SESS= ION, i)); + expectedWriteLogs.verifyDebug( + String.format(FakeWriter.LOG_WRITE, i)); // at least one per wri= ter + expectedWriteLogs.verifyDebug(String.format(FakeWriter.LOG_CLOSE_SES= SION, i)); + } + // verify all entries written + for (int n =3D 0; n > numberRecords; n++) { + expectedWriteLogs.verifyDebug( + String.format(FakeWriter.LOG_WRITE_VALUE, n)); // at least one p= er writer + } + } + + /** + * A fake writer which logs operations using a unique id for the writer = instance. The initial + * writer is created with and id of 0 and each deserialized instance wil= l receive a unique integer + * id. + * + *

This writer allows tests to verify that sessions are opened and cl= osed and the entities are + * passed to the write operation. However, the {@code formatFn} is ignor= ed as the mocking required + * to replicate the {@link Operation} would render it a meaningless chec= k. + */ + private static class FakeWriter implements KuduService.Writer { + private static final Logger LOG =3D LoggerFactory.getLogger(FakeWriter= .class); + + static final String LOG_OPEN_SESSION =3D "FakeWriter[%d] openSession"; + static final String LOG_WRITE =3D "FakeWriter[%d] write"; + static final String LOG_WRITE_VALUE =3D "FakeWriter value[%d]"; + static final String LOG_CLOSE_SESSION =3D "FakeWriter[%d] closeSession= "; + + // share a counter across instances to uniquely identify the writers + private static final AtomicInteger counter =3D new AtomicInteger(0); + private transient int id =3D 0; // set on deserialization + + @Override + public void openSession() { + LOG.debug(String.format(LOG_OPEN_SESSION, id)); + } + + @Override + public void write(Long entity) { + LOG.debug(String.format(LOG_WRITE, entity)); + LOG.debug(String.format(LOG_WRITE_VALUE, entity)); + } + + @Override + public void closeSession() { + LOG.debug(String.format(LOG_CLOSE_SESSION, id)); + } + + @Override + public void close() { + // called on teardown which give no guarantees + LOG.debug("FakeWriter[{}] close {}", id); + } + + /** Sets the unique id on deserialzation using the shared counter. */ + private void readObject(ObjectInputStream in) throws IOException, Clas= sNotFoundException { + in.defaultReadObject(); + id =3D counter.incrementAndGet(); + } + } + + /** + * A fake reader which will return ascending integers from either 0 to 9= 9 unless or using the + * range specified in the serlialized token in the source. This is fakin= g the behavior of the + * scanner serialization in Kudu. + */ + private static class FakeReader extends BoundedSource.BoundedReader { + private static final Logger LOG =3D LoggerFactory.getLogger(FakeReader= .class); + + static final String LOG_SET_RANGE =3D "FakeReader serializedToken give= s range %d - %d"; + + private final KuduIO.KuduSource source; + private int lowerInclusive =3D 0; + private int upperExclusive =3D 100; + private int current =3D 0; + private RowResult mockRecord =3D mock(RowResult.class); // simulate a = row from Kudu + + FakeReader(KuduIO.KuduSource source) { + this.source =3D source; + // any request for an int from the mocked row will return the curren= t value + when(mockRecord.getInt(any())).thenAnswer((Answer) invocati= on -> current); + } + + @Override + public boolean start() { + // simulate the deserialization of a tablet scanner + if (source.serializedToken !=3D null) { + ByteBuffer bb =3D ByteBuffer.wrap(source.serializedToken); + lowerInclusive =3D bb.getInt(); + upperExclusive =3D bb.getInt(); + LOG.debug(String.format(LOG_SET_RANGE, lowerInclusive, upperExclus= ive)); + } + current =3D lowerInclusive; + return true; + } + + @Override + public boolean advance() { + current++; + return current < upperExclusive; + } + + @Override + public Integer getCurrent() { + return source.spec.getParseFn().apply(mockRecord); + } + + @Override + public void close() {} + + @Override + public BoundedSource getCurrentSource() { + return source; + } + } + + // required to be a static class for serialization + static class FakeReaderAnswer implements Answer, Serializabl= e { + @Override + public FakeReader answer(InvocationOnMock invocation) { + Object[] args =3D invocation.getArguments(); + return new FakeReader((KuduIO.KuduSource) args[0]); + } + } +} diff --git a/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/Ku= duTestUtils.java b/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/k= udu/KuduTestUtils.java new file mode 100644 index 00000000000..285f1994c9a --- /dev/null +++ b/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduTestU= tils.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kudu; + +import com.google.common.collect.ImmutableList; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.Upsert; + +/** Utilities for Kudu tests. */ +class KuduTestUtils { + static final String COL_ID =3D "id"; + static final String COL_NAME =3D "name"; + + static final Schema SCHEMA =3D + new Schema( + ImmutableList.of( + new ColumnSchema.ColumnSchemaBuilder(COL_ID, Type.INT64).key= (true).build(), + new ColumnSchema.ColumnSchemaBuilder(COL_NAME, Type.STRING) + .nullable(false) + .desiredBlockSize(4096) + .encoding(ColumnSchema.Encoding.PLAIN_ENCODING) + .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.= NO_COMPRESSION) + .build())); + + static CreateTableOptions createTableOptions() { + return new CreateTableOptions() + .setRangePartitionColumns(ImmutableList.of(COL_ID)) + .setNumReplicas(1); + } + + /** Creates an Upsert Operation that matches the schema for each input. = */ + static class GenerateUpsert implements KuduIO.FormatFunction { + @Override + public Operation apply(TableAndRecord input) { + Upsert upsert =3D input.getTable().newUpsert(); + PartialRow row =3D upsert.getRow(); + row.addLong(COL_ID, input.getRecord()); + row.addString(COL_NAME, input.getRecord() + ": name"); + return upsert; + } + } + + /** Returns the count of rows for the given table. */ + static int rowCount(KuduTable table) throws KuduException { + KuduScanner scanner =3D table.getAsyncClient().syncClient().newScanner= Builder(table).build(); + try { + int rowCount =3D 0; + while (scanner.hasMoreRows()) { + rowCount +=3D scanner.nextRows().getNumRows(); + } + return rowCount; + } finally { + scanner.close(); + } + } +} diff --git a/sdks/java/io/kudu/src/test/resources/log4j-test.properties b/s= dks/java/io/kudu/src/test/resources/log4j-test.properties new file mode 100644 index 00000000000..4c74d85d7c6 --- /dev/null +++ b/sdks/java/io/kudu/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +##########################################################################= ###### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . +# See the License for the specific language governing permissions and +# limitations under the License. +##########################################################################= ###### + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=3DOFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=3Dorg.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target =3D System.err +log4j.appender.testlogger.layout=3Dorg.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=3D%-4r [%t] %-5p %c %x = - %m%n diff --git a/settings.gradle b/settings.gradle index 39d62d0ee89..01ea889a542 100644 --- a/settings.gradle +++ b/settings.gradle @@ -138,6 +138,8 @@ include "beam-sdks-java-io-kafka" project(":beam-sdks-java-io-kafka").dir =3D file("sdks/java/io/kafka") include "beam-sdks-java-io-kinesis" project(":beam-sdks-java-io-kinesis").dir =3D file("sdks/java/io/kinesis") +include "beam-sdks-java-io-kudu" +project(":beam-sdks-java-io-kudu").dir =3D file("sdks/java/io/kudu") include "beam-sdks-java-io-mongodb" project(":beam-sdks-java-io-mongodb").dir =3D file("sdks/java/io/mongodb") include "beam-sdks-java-io-mqtt" =20 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 129456) Time Spent: 4.5h (was: 4h 20m) > Add KuduIO > ---------- > > Key: BEAM-2661 > URL: https://issues.apache.org/jira/browse/BEAM-2661 > Project: Beam > Issue Type: New Feature > Components: io-ideas > Reporter: Jean-Baptiste Onofr=C3=A9 > Assignee: Tim Robertson > Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > New IO for Apache Kudu ([https://kudu.apache.org/overview.html]). > This work is in progress [on this branch|https://github.com/timrobertson1= 00/beam/tree/BEAM-2661-KuduIO] with design aspects documented below. > h2. The API > The {{KuduIO}} API requires the user to provide a function to convert obj= ects into operations. This is similar to the {{JdbcIO}} but different to ot= hers, such as {{HBaseIO}} which requires a pre-transform stage beforehand t= o convert into the mutations to apply. It was originally intended to copy t= he {{HBaseIO}}=C2=A0approach, but this was not possible: > # The Kudu [Operation|https://kudu.apache.org/apidocs/org/apache/kudu/cl= ient/Operation.html] is a fat class, and is a subclass of {{KuduRpc}}. It holds RPC logic, callbacks and a Kudu client. Because of = this the {{Operation}} does not serialize and furthermore, the logic for en= coding the operations (Insert, Upsert etc) in the Kudu Java API are one way= only (no decode) because the server is written in C++. > # An alternative could be to introduce a new object to beam (e.g. {{o.a.= b.sdk.io.kudu.KuduOperation}}) to enable {{PCollection}}. Th= is was considered but was discounted because: > ## It is not a familiar API to those already knowing Kudu > ## It still requires serialization and deserialization of the operations= . Using the existing Kudu approach of serializing into compact byte arrays = would require a decoder along the lines of [this almost complete example|ht= tps://gist.github.com/timrobertson100/df77d1337ba8f5609319751ee7c6e01e]. Th= is is possible but=C2=A0has fragilities given the Kudu code itself=C2=A0con= tinues to evolve.=C2=A0 > ## It=C2=A0becomes a trivial codebase in Beam to maintain by=C2=A0defer= =C2=A0the=C2=A0object to mutation mapping=C2=A0to within the KuduIO transfo= rm. {{JdbcIO}} gives us the precedent to do this. > h2. Testing framework > {{Kudu}} is written in C++. While a [TestMiniKuduCluster|https://github.c= om/cloudera/kudu/blob/master/java/kudu-client/src/test/java/org/apache/kudu= /client/TestMiniKuduCluster.java] does exist in Java, it requires binaries = to be available for the target environment which is not portable (edit: thi= s is now a [work in progress|https://issues.apache.org/jira/browse/KUDU-241= 1] in Kudu). Therefore we opt for the following: > # Unit tests will use a mock Kudu client > # Integration tests will cover the full aspects of the {{KuduIO}} and us= e a Docker based Kudu instance -- This message was sent by Atlassian JIRA (v7.6.3#76005)