beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-2661) Add KuduIO
Date Tue, 31 Jul 2018 19:14:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-2661?focusedWorklogId=129456&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129456 ]

ASF GitHub Bot logged work on BEAM-2661:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Jul/18 19:13
            Start Date: 31/Jul/18 19:13
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #6021: [BEAM-2661] Adds KuduIO
URL: https://github.com/apache/beam/pull/6021
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/kudu/build.gradle b/sdks/java/io/kudu/build.gradle
new file mode 100644
index 00000000000..5457ec7dfdd
--- /dev/null
+++ b/sdks/java/io/kudu/build.gradle
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: org.apache.beam.gradle.BeamModulePlugin
+applyJavaNature()
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Kudu"
+ext.summary = "Library to read and write from/to Kudu"
+
+test {
+  systemProperty "log4j.configuration", "log4j-test.properties"
+  jvmArgs "-XX:-UseGCOverheadLimit"
+  if (System.getProperty("beamSurefireArgline")) {
+    jvmArgs System.getProperty("beamSurefireArgline")
+  }
+}
+
+def kudu_version = "1.4.0"
+
+dependencies {
+  compile library.java.guava
+  shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
+  shadow "org.apache.kudu:kudu-client:$kudu_version"
+  shadow library.java.slf4j_api
+  testCompile project(path: ":beam-runners-direct-java", configuration: "shadow")
+  testCompile project(path: ":beam-sdks-java-core", configuration: "shadowTest")
+  testCompile project(path: ":beam-sdks-java-io-common", configuration: "shadow")
+  testCompile project(path: ":beam-sdks-java-io-common", configuration: "shadowTest")
+  testCompile library.java.hamcrest_core
+  testCompile library.java.hamcrest_library
+  testCompile library.java.junit
+}
+
diff --git a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java
new file mode 100644
index 00000000000..5694946d3dd
--- /dev/null
+++ b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kudu;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.kudu.Common;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.RowResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A bounded source and sink for Kudu.
+ *
+ * <p>For more information, see the online documentation at <a
+ * href="https://kudu.apache.org/">Kudu</a>.
+ *
+ * <h3>Reading from Kudu</h3>
+ *
+ * <p>{@code KuduIO} provides a source to read and returns a bounded collection of entities as
+ * {@code PCollection&lt;T&gt;}. An entity is built by parsing a Kudu {@link RowResult} using the
+ * provided {@link SerializableFunction&lt;RowResult, T&gt;}.
+ *
+ * <p>The following example illustrates various options for configuring the IO:
+ *
+ * <pre>{@code
+ * pipeline.apply(
+ *     KuduIO.<String>read()
+ *         .withMasterAddresses("kudu1:8051,kudu2:8051,kudu3:8051")
+ *         .withTable("table")
+ *         .withParseFn(
+ *             (SerializableFunction<RowResult, String>) input -> input.getString(COL_NAME))
+ *         .withCoder(StringUtf8Coder.of()));
+ *     // above options illustrate a typical minimum set, returns PCollection<String>
+ * }</pre>
+ *
+ * <p>{@code withCoder(...)} may be omitted if it can be inferred from the @{CoderRegistry}.
+ * However, when using a Lambda Expression or an anonymous inner class to define the function, type
+ * erasure will prohibit this. In such cases you are required to explicitly set the coder as in the
+ * above example.
+ *
+ * <p>Optionally, you can provide {@code withPredicates(...)} to apply a query to filter rows from
+ * the kudu table.
+ *
+ * <p>Optionally, you can provide {@code withProjectedColumns(...)} to limit the columns returned
+ * from the Kudu scan to improve performance. The columns required in the {@code ParseFn} must be
+ * declared in the projected columns.
+ *
+ * <p>Optionally, you can provide {@code withBatchSize(...)} to set the number of bytes returned
+ * from the Kudu scanner in each batch.
+ *
+ * <p>Optionally, you can provide {@code withFaultTolerent(...)} to enforce the read scan to resume
+ * a scan on another tablet server if the current server fails.
+ *
+ * <h3>Writing to Kudu</h3>
+ *
+ * <p>The Kudu sink executes a set of operations on a single table. It takes as input a {@link
+ * PCollection PCollection&lt;T&gt;} and a {@link FormatFunction&lt;T&gt;} which is responsible for
+ * converting the input into an idempotent transformation on a row.
+ *
+ * <p>To configure a Kudu sink, you must supply the Kudu master addresses, the table name and a
+ * {@link FormatFunction} to convert the input records, for example:
+ *
+ * <pre>{@code
+ * PCollection<MyType> data = ...;
+ * FormatFunction<MyType> fn = ...;
+ *
+ * data.apply("write",
+ *     KuduIO.write()
+ *         .withMasterAddresses("kudu1:8051,kudu2:8051,kudu3:8051")
+ *         .withTable("table")
+ *         .withFormatFn(fn));
+ * }</pre>
+ *
+ * <h3>Experimental</h3>
+ *
+ * {@code KuduIO} does not support authentication in this release.
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class KuduIO {
+  private static final Logger LOG = LoggerFactory.getLogger(KuduIO.class);
+
+  private KuduIO() {}
+
+  public static <T> Read<T> read() {
+    return new AutoValue_KuduIO_Read.Builder<T>().setKuduService(new KuduServiceImpl<>()).build();
+  }
+
+  public static <T> Write<T> write() {
+    return new AutoValue_KuduIO_Write.Builder<T>().setKuduService(new KuduServiceImpl<>()).build();
+  }
+
+  /**
+   * An interface used by the KuduIO Write to convert an input record into an Operation to apply as
+   * a mutation in Kudu.
+   */
+  @FunctionalInterface
+  public interface FormatFunction<T> extends SerializableFunction<TableAndRecord<T>, Operation> {}
+
+  /** Implementation of {@link KuduIO#read()}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable
+    abstract List<String> getMasterAddresses();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract Integer getBatchSize();
+
+    @Nullable
+    abstract List<String> getProjectedColumns();
+
+    @Nullable
+    abstract List<Common.ColumnPredicatePB> getSerializablePredicates();
+
+    @Nullable
+    abstract Boolean getFaultTolerent();
+
+    @Nullable
+    abstract SerializableFunction<RowResult, T> getParseFn();
+
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    @Nullable
+    abstract KuduService<T> getKuduService();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setMasterAddresses(List<String> masterAddresses);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setBatchSize(Integer batchSize);
+
+      abstract Builder<T> setProjectedColumns(List<String> projectedColumns);
+
+      abstract Builder<T> setSerializablePredicates(
+          List<Common.ColumnPredicatePB> serializablePredicates);
+
+      abstract Builder<T> setFaultTolerent(Boolean faultTolerent);
+
+      abstract Builder<T> setParseFn(SerializableFunction<RowResult, T> parseFn);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setKuduService(KuduService<T> kuduService);
+
+      abstract Read<T> build();
+    }
+
+    @VisibleForTesting
+    Coder<T> inferCoder(CoderRegistry coderRegistry) {
+      try {
+        return getCoder() != null
+            ? getCoder()
+            : coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn()));
+      } catch (CannotProvideCoderException e) {
+        throw new IllegalArgumentException(
+            "Unable to infer coder for output of parseFn ("
+                + TypeDescriptors.outputOf(getParseFn())
+                + "). Specify it explicitly using withCoder().",
+            e);
+      }
+    }
+
+    /** Reads from the Kudu cluster on the specified master addresses. */
+    public Read<T> withMasterAddresses(String masterAddresses) {
+      checkArgument(masterAddresses != null, "masterAddresses cannot be null or empty");
+      return builder().setMasterAddresses(Splitter.on(",").splitToList(masterAddresses)).build();
+    }
+
+    /** Reads from the specified table. */
+    public Read<T> withTable(String table) {
+      checkArgument(table != null, "table cannot be null");
+      return builder().setTable(table).build();
+    }
+
+    /** Provides the function to parse a row from Kudu into the typed object. */
+    public Read<T> withParseFn(SerializableFunction<RowResult, T> parseFn) {
+      checkArgument(parseFn != null, "parseFn cannot be null");
+      return builder().setParseFn(parseFn).build();
+    }
+
+    /** Filters the rows read from Kudu using the given predicates. */
+    public Read<T> withPredicates(List<KuduPredicate> predicates) {
+      checkArgument(predicates != null, "predicates cannot be null");
+      // reuse the kudu protobuf serialization mechanism
+      List<Common.ColumnPredicatePB> serializablePredicates =
+          predicates.stream().map(KuduPredicate::toPB).collect(Collectors.toList());
+      return builder().setSerializablePredicates(serializablePredicates).build();
+    }
+
+    /** Filters the columns read from the table to include only those specified. */
+    public Read<T> withProjectedColumns(List<String> projectedColumns) {
+      checkArgument(projectedColumns != null, "projectedColumns cannot be null");
+      return builder().setProjectedColumns(projectedColumns).build();
+    }
+
+    /** Reads from the table in batches of the specified size. */
+    public Read<T> withBatchSize(int batchSize) {
+      checkArgument(batchSize >= 0, "batchSize must not be negative");
+      return builder().setBatchSize(batchSize).build();
+    }
+
+    /**
+     * Instructs the read scan to resume a scan on another tablet server if the current server fails
+     * and faultTolerant is set to true.
+     */
+    public Read<T> withFaultTolerent(boolean faultTolerent) {
+      return builder().setFaultTolerent(faultTolerent).build();
+    }
+
+    /**
+     * Sets a {@link Coder} for the result of the parse function. This may be required if a coder
+     * can not be inferred automatically.
+     */
+    public Read<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder cannot be null");
+      return builder().setCoder(coder).build();
+    }
+
+    /** Specify an instance of {@link KuduService} used to connect and read from Kudu. */
+    @VisibleForTesting
+    Read<T> withKuduService(KuduService<T> kuduService) {
+      checkArgument(kuduService != null, "kuduService cannot be null");
+      return builder().setKuduService(kuduService).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      Pipeline p = input.getPipeline();
+      final Coder<T> coder = inferCoder(p.getCoderRegistry());
+      return input.apply(org.apache.beam.sdk.io.Read.from(new KuduSource<>(this, coder, null)));
+    }
+
+    @Override
+    public void validate(PipelineOptions pipelineOptions) {
+      checkState(
+          getMasterAddresses() != null,
+          "KuduIO.read() requires a list of master addresses to be set via withMasterAddresses(masterAddresses)");
+      checkState(
+          getTable() != null,
+          "KuduIO.read() requires a table name to be set via withTableName(tableName)");
+      checkState(
+          getParseFn() != null,
+          "KuduIO.read() requires a parse function to be set via withParseFn(parseFn)");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("masterAddresses", getMasterAddresses().toString()));
+      builder.add(DisplayData.item("table", getTable()));
+    }
+  }
+
+  static class KuduSource<T> extends BoundedSource {
+    final Read<T> spec;
+    private final Coder<T> coder;
+    @Nullable byte[] serializedToken; // only during a split
+
+    KuduSource(Read spec, Coder<T> coder, byte[] serializedToken) {
+      this.spec = spec;
+      this.coder = coder;
+      this.serializedToken = serializedToken;
+    }
+
+    // A Kudu source can be split once only providing a source per tablet
+    @Override
+    public List<BoundedSource<T>> split(long desiredBundleSizeBytes, PipelineOptions options)
+        throws KuduException {
+      if (serializedToken != null) {
+        return Collections.singletonList(this); // we are already a split
+
+      } else {
+        Stream<BoundedSource<T>> sources =
+            spec.getKuduService()
+                .createTabletScanners(spec)
+                .stream()
+                .map(s -> new KuduIO.KuduSource<T>(spec, spec.getCoder(), s));
+        return sources.collect(Collectors.toList());
+      }
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) {
+      return 0; // Kudu does not expose tablet sizes
+    }
+
+    @Override
+    public BoundedReader<T> createReader(PipelineOptions options) {
+      return spec.getKuduService().createReader(this);
+    }
+
+    @Override
+    public Coder<T> getOutputCoder() {
+      return coder;
+    }
+  }
+
+  /**
+   * A {@link PTransform} that writes to Kudu. See the class-level Javadoc on {@link KuduIO} for
+   * more information.
+   *
+   * @see KuduIO
+   */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract List<String> masterAddresses();
+
+    @Nullable
+    abstract String table();
+
+    @Nullable
+    abstract FormatFunction<T> formatFn();
+
+    @Nullable
+    abstract KuduService<T> kuduService();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setMasterAddresses(List<String> masterAddresses);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setFormatFn(FormatFunction<T> formatFn);
+
+      abstract Builder<T> setKuduService(KuduService<T> kuduService);
+
+      abstract Write<T> build();
+    }
+
+    /** Writes to the Kudu cluster on the specified master addresses. */
+    public Write withMasterAddresses(String masterAddresses) {
+      checkArgument(masterAddresses != null, "masterAddresses cannot be null or empty");
+      return builder().setMasterAddresses(Splitter.on(",").splitToList(masterAddresses)).build();
+    }
+
+    /** Writes to the specified table. */
+    public Write withTable(String table) {
+      checkArgument(table != null, "table cannot be null");
+      return builder().setTable(table).build();
+    }
+
+    /** Writes using the given function to create the mutation operations from the input. */
+    public Write withFormatFn(FormatFunction<T> formatFn) {
+      checkArgument(formatFn != null, "formatFn cannot be null");
+      return builder().setFormatFn(formatFn).build();
+    }
+
+    /** Specify the {@link KuduService} used to connect and write into the Kudu table. */
+    @VisibleForTesting
+    Write<T> withKuduService(KuduService<T> kuduService) {
+      checkArgument(kuduService != null, "kuduService cannot be null");
+      return builder().setKuduService(kuduService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      input.apply(ParDo.of(new WriteFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public void validate(PipelineOptions pipelineOptions) {
+      checkState(
+          masterAddresses() != null,
+          "KuduIO.write() requires a list of master addresses to be set via withMasterAddresses(masterAddresses)");
+      checkState(
+          table() != null, "KuduIO.write() requires a table name to be set via withTable(table)");
+      checkState(
+          formatFn() != null,
+          "KuduIO.write() requires a format function to be set via withFormatFn(formatFn)");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("masterAddresses", masterAddresses().toString()));
+      builder.add(DisplayData.item("tableName", table()));
+      builder.add(DisplayData.item("formatFn", formatFn().getClass().getCanonicalName()));
+    }
+
+    private class WriteFn extends DoFn<T, Void> {
+      private final Write<T> spec;
+      private KuduService.Writer writer;
+
+      WriteFn(Write<T> spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() throws KuduException {
+        writer = spec.kuduService().createWriter(spec);
+      }
+
+      @StartBundle
+      public void startBundle(StartBundleContext context) throws KuduException {
+        writer.openSession();
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws KuduException {
+        writer.write(c.element());
+      }
+
+      @FinishBundle
+      public void finishBundle() throws Exception {
+        writer.closeSession();
+      }
+
+      @Teardown
+      public void teardown() throws Exception {
+        writer.close();
+        writer = null;
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+        builder.add(DisplayData.item("masterAddresses", spec.masterAddresses().toString()));
+        builder.add(DisplayData.item("table", spec.table()));
+      }
+    }
+  }
+}
diff --git a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduService.java b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduService.java
new file mode 100644
index 00000000000..7b14e95de6b
--- /dev/null
+++ b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduService.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kudu;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.kudu.client.KuduException;
+
+/** An interface for real, mock, or fake implementations of Kudu services. */
+interface KuduService<T> extends Serializable {
+
+  /**
+   * Returns a {@link org.apache.beam.sdk.io.BoundedSource.BoundedReader} that will read from Kudu
+   * using the spec from {@link org.apache.beam.sdk.io.kudu.KuduIO.KuduSource}.
+   */
+  BoundedSource.BoundedReader<T> createReader(KuduIO.KuduSource<T> source);
+
+  /** Create a {@link Writer} that writes entities into the KKudu instance. */
+  Writer createWriter(KuduIO.Write<T> spec) throws KuduException;
+
+  /** Returns a list containing a serialized scanner per tablet. */
+  List<byte[]> createTabletScanners(KuduIO.Read<T> spec) throws KuduException;
+
+  /** Writer for an entity. */
+  interface Writer<T> extends AutoCloseable, Serializable {
+
+    /**
+     * Opens a new session for writing. This must be called exactly once before calling {@link
+     * #write(Object)}.
+     */
+    void openSession() throws KuduException;
+
+    /**
+     * Writes the entity to Kudu. A call to {@link #openSession()} must be made before writing.
+     * Writes may be asynchronous in which case implementations must surface errors when the session
+     * is closed.
+     */
+    void write(T entity) throws KuduException;
+
+    /** Closes the session, surfacing any errors that may have occurred during writing. */
+    void closeSession() throws Exception;
+  }
+}
diff --git a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduServiceImpl.java b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduServiceImpl.java
new file mode 100644
index 00000000000..12dc3b12080
--- /dev/null
+++ b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduServiceImpl.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kudu;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.kudu.Common;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.AbstractKuduScannerBuilder;
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.RowError;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import org.apache.kudu.client.SessionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** An implementation of the {@link KuduService} that uses a Kudu instance. */
+class KuduServiceImpl<T> implements KuduService<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(KuduServiceImpl.class);
+
+  @Override
+  public Writer createWriter(KuduIO.Write<T> spec) throws KuduException {
+    return new WriterImpl(spec);
+  }
+
+  @Override
+  public BoundedSource.BoundedReader createReader(KuduIO.KuduSource source) {
+    return new ReaderImpl(source);
+  }
+
+  @Override
+  public List<byte[]> createTabletScanners(KuduIO.Read spec) throws KuduException {
+    try (KuduClient client = getKuduClient(spec.getMasterAddresses())) {
+      KuduTable table = client.openTable(spec.getTable());
+      KuduScanToken.KuduScanTokenBuilder builder = client.newScanTokenBuilder(table);
+      configureBuilder(spec, table.getSchema(), builder);
+      List<KuduScanToken> tokens = builder.build();
+      return tokens.stream().map(t -> uncheckCall(t::serialize)).collect(Collectors.toList());
+    }
+  }
+
+  /** Writer storing an entity into Apache Kudu table. */
+  class WriterImpl implements Writer<T> {
+    private final KuduIO.FormatFunction<T> formatFunction;
+    private KuduClient client;
+    private KuduSession session;
+    private KuduTable table;
+
+    WriterImpl(KuduIO.Write<T> spec) throws KuduException {
+      checkNotNull(spec.masterAddresses(), "masterAddresses cannot be null");
+      checkNotNull(spec.table(), "table cannot be null");
+      this.formatFunction = checkNotNull(spec.formatFn(), "formatFn cannot be null");
+      client =
+          new AsyncKuduClient.AsyncKuduClientBuilder(spec.masterAddresses()).build().syncClient();
+      table = client.openTable(spec.table());
+    }
+
+    @Override
+    public void openSession() throws KuduException {
+      // errors are collected per session so we align session with the bundle
+      session = client.newSession();
+      // async flushing as per the official kudu-spark approach
+      session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+    }
+
+    @Override
+    public void write(T entity) throws KuduException {
+      checkState(session != null, "must call openSession() before writing");
+      session.apply(formatFunction.apply(new TableAndRecord(table, entity)));
+    }
+
+    @Override
+    public void closeSession() throws Exception {
+      try {
+        session.close();
+        if (session.countPendingErrors() > 0) {
+          LOG.error("At least {} errors occurred writing to Kudu", session.countPendingErrors());
+          RowError[] errors = session.getPendingErrors().getRowErrors();
+          for (int i = 0; errors != null && i < 3 && i < errors.length; i++) {
+            LOG.error("Sample error: {}", errors[i]);
+          }
+          throw new Exception(
+              "At least " + session.countPendingErrors() + " error(s) occurred writing to Kudu");
+        }
+      } finally {
+        session = null;
+      }
+    }
+
+    @Override
+    public void close() throws Exception {
+      client.close();
+      client = null;
+    }
+  }
+
+  /** Bounded reader of an Apache Kudu table. */
+  class ReaderImpl extends BoundedSource.BoundedReader<T> {
+    private final KuduIO.KuduSource<T> source;
+    private KuduClient client;
+    private KuduScanner scanner;
+    private RowResultIterator iter;
+    private RowResult current;
+    private long recordsReturned;
+
+    ReaderImpl(KuduIO.KuduSource<T> source) {
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      LOG.debug("Starting Kudu reader");
+      client =
+          new AsyncKuduClient.AsyncKuduClientBuilder(source.spec.getMasterAddresses())
+              .build()
+              .syncClient();
+
+      if (source.serializedToken != null) {
+        // tokens available if the source is already split
+        scanner = KuduScanToken.deserializeIntoScanner(source.serializedToken, client);
+      } else {
+        KuduTable table = client.openTable(source.spec.getTable());
+        KuduScanner.KuduScannerBuilder builder =
+            table.getAsyncClient().syncClient().newScannerBuilder(table);
+
+        configureBuilder(source.spec, table.getSchema(), builder);
+        scanner = builder.build();
+      }
+
+      return advance();
+    }
+
+    /**
+     * Returns the current record transformed into the desired type.
+     *
+     * @return the current record
+     * @throws NoSuchElementException If the current does not exist
+     */
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      if (current != null) {
+        return source.spec.getParseFn().apply(current);
+
+      } else {
+        throw new NoSuchElementException(
+            "No current record (Indicates misuse. Perhaps advance() was not called?)");
+      }
+    }
+
+    @Override
+    public boolean advance() throws KuduException {
+      // scanner pages over results, with each page holding an iterator of records
+      if (iter == null || (!iter.hasNext() && scanner.hasMoreRows())) {
+        iter = scanner.nextRows();
+      }
+
+      if (iter != null && iter.hasNext()) {
+        current = iter.next();
+        ++recordsReturned;
+        return true;
+      }
+
+      return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+      LOG.debug("Closing reader after reading {} records.", recordsReturned);
+      if (scanner != null) {
+        scanner.close();
+        scanner = null;
+      }
+      if (client != null) {
+        client.close();
+        client = null;
+      }
+    }
+
+    @Override
+    public synchronized KuduIO.KuduSource getCurrentSource() {
+      return source;
+    }
+  }
+
+  /** Creates a new synchronous client. */
+  private synchronized KuduClient getKuduClient(List<String> masterAddresses) {
+    return new AsyncKuduClient.AsyncKuduClientBuilder(masterAddresses).build().syncClient();
+  }
+
+  /** Configures the scanner builder to conform to the spec. */
+  private static <T2> void configureBuilder(
+      KuduIO.Read<T2> spec, Schema schema, AbstractKuduScannerBuilder builder) {
+    builder.cacheBlocks(true); // as per kudu-spark
+    if (spec.getBatchSize() != null) {
+      builder.batchSizeBytes(spec.getBatchSize());
+    }
+    if (spec.getProjectedColumns() != null) {
+      builder.setProjectedColumnNames(spec.getProjectedColumns());
+    }
+    if (spec.getFaultTolerent() != null) {
+      builder.setFaultTolerant(spec.getFaultTolerent());
+    }
+    if (spec.getSerializablePredicates() != null) {
+      for (Common.ColumnPredicatePB predicate : spec.getSerializablePredicates()) {
+        builder.addPredicate(KuduPredicate.fromPB(schema, predicate));
+      }
+    }
+  }
+
+  /** Wraps the callable converting checked to RuntimeExceptions. */
+  private static <T> T uncheckCall(Callable<T> callable) {
+    try {
+      return callable.call();
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/TableAndRecord.java b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/TableAndRecord.java
new file mode 100644
index 00000000000..06db175d082
--- /dev/null
+++ b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/TableAndRecord.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kudu;
+
+import org.apache.kudu.client.KuduTable;
+
+/**
+ * A wrapper for a {@link KuduTable} and the {@link T} representing a typed record.
+ *
+ * @param <T> The type of the record
+ */
+public class TableAndRecord<T> {
+  private final KuduTable table;
+  private final T record;
+
+  public TableAndRecord(KuduTable table, T record) {
+    this.table = table;
+    this.record = record;
+  }
+
+  public KuduTable getTable() {
+    return table;
+  }
+
+  public T getRecord() {
+    return record;
+  }
+}
diff --git a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/package-info.java b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/package-info.java
new file mode 100644
index 00000000000..cece7e2efdf
--- /dev/null
+++ b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from/to Apache Kudu.
+ *
+ * @see org.apache.beam.sdk.io.kudu.KuduIO
+ */
+package org.apache.beam.sdk.io.kudu;
diff --git a/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduIOIT.java b/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduIOIT.java
new file mode 100644
index 00000000000..ba2e9d6807b
--- /dev/null
+++ b/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduIOIT.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kudu;
+
+import static org.apache.beam.sdk.io.kudu.KuduTestUtils.COL_ID;
+import static org.apache.beam.sdk.io.kudu.KuduTestUtils.COL_NAME;
+import static org.apache.beam.sdk.io.kudu.KuduTestUtils.GenerateUpsert;
+import static org.apache.beam.sdk.io.kudu.KuduTestUtils.SCHEMA;
+import static org.apache.beam.sdk.io.kudu.KuduTestUtils.createTableOptions;
+import static org.apache.beam.sdk.io.kudu.KuduTestUtils.rowCount;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.RowResult;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A test of {@link org.apache.beam.sdk.io.kudu.KuduIO} on an independent Kudu instance.
+ *
+ * <p>This test requires a running instance of Kudu. Pass in connection information using
+ * PipelineOptions:
+ *
+ * <pre>
+ *  ./gradlew integrationTest -p sdks/java/io/Kudu -DintegrationTestPipelineOptions='[
+ *    "--kuduMasterAddresses=127.0.0.1",
+ *    "--kuduTable=beam-integration-test",
+ *    "--numberOfRecords=100000" ]'
+ *    --tests org.apache.beam.sdk.io.kudu.KuduIOIT
+ *    -DintegrationTestRunner=direct
+ * </pre>
+ *
+ * <p>To start a Kudu server in docker you can use the following:
+ *
+ * <pre>
+ *   docker pull usuresearch/apache-kudu docker run -d --rm --name apache-kudu -p 7051:7051 \
+ *     -p 7050:7050 -p 8051:8051 -p 8050:8050 usuresearch/apache-kudu ```
+ * </pre>
+ *
+ * <p>See <a href="https://hub.docker.com/r/usuresearch/apache-kudu/">for information about this
+ * image</a>.
+ *
+ * <p>Once running you may need to visit <a href="http://localhost:8051/masters">the masters
+ * list</a> and copy the host (e.g. <code>host: "e94929167e2a"</code>) adding it to your <code>
+ * etc/hosts</code> file pointing to localhost e.g.:
+ *
+ * <pre>
+ *   127.0.0.1 localhost e94929167e2a
+ * </pre>
+ */
+public class KuduIOIT {
+  private static final Logger LOG = LoggerFactory.getLogger(KuduIOIT.class);
+
+  /** KuduIOIT options. */
+  public interface KuduPipelineOptions extends IOTestPipelineOptions {
+    @Description("Kudu master addresses (comma separated address list)")
+    @Default.String("127.0.0.1:7051")
+    String getKuduMasterAddresses();
+
+    void setKuduMasterAddresses(String masterAddresses);
+
+    @Description("Kudu table")
+    @Default.String("beam-integration-test")
+    String getKuduTable();
+
+    void setKuduTable(String name);
+  }
+
+  private static KuduPipelineOptions options;
+  private static KuduClient client;
+  private static KuduTable kuduTable;
+
+  @Rule public final TestPipeline writePipeline = TestPipeline.create();
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @BeforeClass
+  public static void setUp() throws KuduException {
+    PipelineOptionsFactory.register(KuduPipelineOptions.class);
+    options = TestPipeline.testingPipelineOptions().as(KuduPipelineOptions.class);
+
+    // synchronous operations
+    client =
+        new AsyncKuduClient.AsyncKuduClientBuilder(options.getKuduMasterAddresses())
+            .build()
+            .syncClient();
+
+    if (client.tableExists(options.getKuduTable())) {
+      client.deleteTable(options.getKuduTable());
+    }
+
+    kuduTable =
+        client.createTable(options.getKuduTable(), KuduTestUtils.SCHEMA, createTableOptions());
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    try {
+      if (client.tableExists(options.getKuduTable())) {
+        client.deleteTable(options.getKuduTable());
+      }
+    } finally {
+      client.close();
+    }
+  }
+
+  @Test
+  public void testWriteThenRead() throws Exception {
+    runWrite();
+    runReadAll();
+    readPipeline = TestPipeline.create();
+    runReadProjectedColumns();
+    readPipeline = TestPipeline.create();
+    runReadWithPredicates();
+  }
+
+  private void runReadAll() {
+    // Lambdas erase too much type information so specify the coder
+    PCollection<String> output =
+        readPipeline.apply(
+            KuduIO.<String>read()
+                .withMasterAddresses(options.getKuduMasterAddresses())
+                .withTable(options.getKuduTable())
+                .withParseFn(
+                    (SerializableFunction<RowResult, String>) input -> input.getString(COL_NAME))
+                .withCoder(StringUtf8Coder.of()));
+    PAssert.thatSingleton(output.apply("Count", Count.globally()))
+        .isEqualTo((long) options.getNumberOfRecords());
+
+    readPipeline.run().waitUntilFinish();
+  }
+
+  private void runReadWithPredicates() {
+    PCollection<String> output =
+        readPipeline.apply(
+            "Read with predicates",
+            KuduIO.<String>read()
+                .withMasterAddresses(options.getKuduMasterAddresses())
+                .withTable(options.getKuduTable())
+                .withParseFn(
+                    (SerializableFunction<RowResult, String>) input -> input.getString(COL_NAME))
+                .withPredicates(
+                    Arrays.asList(
+                        KuduPredicate.newComparisonPredicate(
+                            SCHEMA.getColumn(COL_ID), KuduPredicate.ComparisonOp.GREATER_EQUAL, 2),
+                        KuduPredicate.newComparisonPredicate(
+                            SCHEMA.getColumn(COL_ID), KuduPredicate.ComparisonOp.LESS, 7)))
+                .withCoder(StringUtf8Coder.of()));
+
+    output.apply(Count.globally());
+
+    PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo((long) 5);
+
+    readPipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * Tests that the projected columns are passed down to the Kudu scanner by attempting to read the
+   * {@value KuduTestUtils#COL_NAME} in the parse function when it is omitted.
+   */
+  private void runReadProjectedColumns() {
+    thrown.expect(IllegalArgumentException.class);
+    readPipeline
+        .apply(
+            "Read with projected columns",
+            KuduIO.<String>read()
+                .withMasterAddresses(options.getKuduMasterAddresses())
+                .withTable(options.getKuduTable())
+                .withParseFn(
+                    (SerializableFunction<RowResult, String>) input -> input.getString(COL_NAME))
+                .withProjectedColumns(Collections.singletonList(COL_ID))) // COL_NAME excluded
+        .setCoder(StringUtf8Coder.of());
+    readPipeline.run().waitUntilFinish();
+  }
+
+  private void runWrite() throws Exception {
+    writePipeline
+        .apply("Generate sequence", GenerateSequence.from(0).to(options.getNumberOfRecords()))
+        .apply(
+            "Write records to Kudu",
+            KuduIO.write()
+                .withMasterAddresses(options.getKuduMasterAddresses())
+                .withTable(options.getKuduTable())
+                .withFormatFn(new GenerateUpsert()));
+    writePipeline.run().waitUntilFinish();
+
+    Assert.assertThat(
+        "Wrong number of records in table",
+        rowCount(kuduTable),
+        equalTo(options.getNumberOfRecords()));
+  }
+}
diff --git a/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduIOTest.java b/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduIOTest.java
new file mode 100644
index 00000000000..a39b87b4e22
--- /dev/null
+++ b/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduIOTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kudu;
+
+import static org.apache.beam.sdk.io.kudu.KuduTestUtils.COL_ID;
+import static org.apache.beam.sdk.io.kudu.KuduTestUtils.GenerateUpsert;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.RowResult;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A test of {@link KuduIO} using fake Kudu services.
+ *
+ * <p>Since Kudu is written in C++ it does not currently lend itself to easy unit tests from a Java
+ * environment. The Kudu project is actively working on a solution for this (see <a
+ * href="https://issues.apache.org/jira/browse/KUDU-2411">KUDU-2411</a>) which will be used in the
+ * future. In the meantime, only rudimentary tests exist here, with the preferred testing being
+ * carried out in {@link KuduIOIT}.
+ */
+public class KuduIOTest {
+  private static final Logger LOG = LoggerFactory.getLogger(KuduIOTest.class);
+
+  @Rule public final TestPipeline writePipeline = TestPipeline.create();
+  @Rule public final TestPipeline readPipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Rule public final transient ExpectedLogs expectedWriteLogs = ExpectedLogs.none(FakeWriter.class);
+  @Rule public final transient ExpectedLogs expectedReadLogs = ExpectedLogs.none(FakeReader.class);
+
+  private KuduService<Integer> mockReadService;
+  private KuduService<String> mockWriteService;
+
+  private final int numberRecords = 10;
+  private int targetParallelism = 3; // determined by the runner, but direct has min of 3
+
+  @Before
+  public void setUp() throws Exception {
+    mockReadService = mock(KuduService.class, withSettings().serializable());
+    mockWriteService = mock(KuduService.class, withSettings().serializable());
+  }
+
+  /**
+   * Tests the read path using a {@link FakeReader}. The {@link KuduService} is mocked to simulate 4
+   * tablets and fake the encoding of a scanner for each tablet. The test verifies that the {@link
+   * KuduIO} correctly splits into 4 sources and instantiates a reader for each, and that the
+   * correct number of records are read.
+   */
+  @Test
+  public void testRead() throws KuduException {
+    when(mockReadService.createReader(any())).thenAnswer(new FakeReaderAnswer());
+    // Simulate the equivalent of Kudu providing an encoded scanner per tablet. Here we encode
+    // a range which the fake reader will use to simulate a single tablet read.
+    List<byte[]> fakeScanners =
+        Arrays.asList(
+            ByteBuffer.allocate(8).putInt(0).putInt(25).array(),
+            ByteBuffer.allocate(8).putInt(25).putInt(50).array(),
+            ByteBuffer.allocate(8).putInt(50).putInt(75).array(),
+            ByteBuffer.allocate(8).putInt(75).putInt(100).array());
+    when(mockReadService.createTabletScanners(any())).thenReturn(fakeScanners);
+
+    PCollection<Integer> output =
+        readPipeline.apply(
+            KuduIO.<Integer>read()
+                .withMasterAddresses("mock")
+                .withTable("Table")
+                // the fake reader only deals with a single int
+                .withParseFn(
+                    (SerializableFunction<RowResult, Integer>) input -> input.getInt(COL_ID))
+                .withKuduService(mockReadService)
+                .withCoder(BigEndianIntegerCoder.of()));
+
+    PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo((long) 100);
+
+    readPipeline.run().waitUntilFinish();
+
+    // check that the fake tablet ranges were read
+    expectedReadLogs.verifyDebug(String.format(FakeReader.LOG_SET_RANGE, 0, 25));
+    expectedReadLogs.verifyDebug(String.format(FakeReader.LOG_SET_RANGE, 25, 50));
+    expectedReadLogs.verifyDebug(String.format(FakeReader.LOG_SET_RANGE, 50, 75));
+    expectedReadLogs.verifyDebug(String.format(FakeReader.LOG_SET_RANGE, 75, 100));
+  }
+
+  /**
+   * Test the write path using a {@link FakeWriter} and verifying the expected log statements are
+   * written. This test ensures that the {@link KuduIO} correctly respects parallelism by
+   * deserializes writers and that each writer is opening and closing Kudu sessions.
+   */
+  @Test
+  public void testWrite() throws Exception {
+    when(mockWriteService.createWriter(any())).thenReturn(new FakeWriter());
+
+    writePipeline
+        .apply("Generate sequence", GenerateSequence.from(0).to(numberRecords))
+        .apply(
+            "Write records to Kudu",
+            KuduIO.write()
+                .withMasterAddresses("ignored")
+                .withTable("ignored")
+                .withFormatFn(new GenerateUpsert()) // ignored (mocking Operation is pointless)
+                .withKuduService(mockWriteService));
+    writePipeline.run().waitUntilFinish();
+
+    for (int i = 1; i <= targetParallelism + 1; i++) {
+      expectedWriteLogs.verifyDebug(String.format(FakeWriter.LOG_OPEN_SESSION, i));
+      expectedWriteLogs.verifyDebug(
+          String.format(FakeWriter.LOG_WRITE, i)); // at least one per writer
+      expectedWriteLogs.verifyDebug(String.format(FakeWriter.LOG_CLOSE_SESSION, i));
+    }
+    // verify all entries written
+    for (int n = 0; n > numberRecords; n++) {
+      expectedWriteLogs.verifyDebug(
+          String.format(FakeWriter.LOG_WRITE_VALUE, n)); // at least one per writer
+    }
+  }
+
+  /**
+   * A fake writer which logs operations using a unique id for the writer instance. The initial
+   * writer is created with and id of 0 and each deserialized instance will receive a unique integer
+   * id.
+   *
+   * <p>This writer allows tests to verify that sessions are opened and closed and the entities are
+   * passed to the write operation. However, the {@code formatFn} is ignored as the mocking required
+   * to replicate the {@link Operation} would render it a meaningless check.
+   */
+  private static class FakeWriter implements KuduService.Writer<Long> {
+    private static final Logger LOG = LoggerFactory.getLogger(FakeWriter.class);
+
+    static final String LOG_OPEN_SESSION = "FakeWriter[%d] openSession";
+    static final String LOG_WRITE = "FakeWriter[%d] write";
+    static final String LOG_WRITE_VALUE = "FakeWriter value[%d]";
+    static final String LOG_CLOSE_SESSION = "FakeWriter[%d] closeSession";
+
+    // share a counter across instances to uniquely identify the writers
+    private static final AtomicInteger counter = new AtomicInteger(0);
+    private transient int id = 0; // set on deserialization
+
+    @Override
+    public void openSession() {
+      LOG.debug(String.format(LOG_OPEN_SESSION, id));
+    }
+
+    @Override
+    public void write(Long entity) {
+      LOG.debug(String.format(LOG_WRITE, entity));
+      LOG.debug(String.format(LOG_WRITE_VALUE, entity));
+    }
+
+    @Override
+    public void closeSession() {
+      LOG.debug(String.format(LOG_CLOSE_SESSION, id));
+    }
+
+    @Override
+    public void close() {
+      // called on teardown which give no guarantees
+      LOG.debug("FakeWriter[{}] close {}", id);
+    }
+
+    /** Sets the unique id on deserialzation using the shared counter. */
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
+      id = counter.incrementAndGet();
+    }
+  }
+
+  /**
+   * A fake reader which will return ascending integers from either 0 to 99 unless or using the
+   * range specified in the serlialized token in the source. This is faking the behavior of the
+   * scanner serialization in Kudu.
+   */
+  private static class FakeReader extends BoundedSource.BoundedReader<Integer> {
+    private static final Logger LOG = LoggerFactory.getLogger(FakeReader.class);
+
+    static final String LOG_SET_RANGE = "FakeReader serializedToken gives range %d - %d";
+
+    private final KuduIO.KuduSource<Integer> source;
+    private int lowerInclusive = 0;
+    private int upperExclusive = 100;
+    private int current = 0;
+    private RowResult mockRecord = mock(RowResult.class); // simulate a row from Kudu
+
+    FakeReader(KuduIO.KuduSource<Integer> source) {
+      this.source = source;
+      // any request for an int from the mocked row will return the current value
+      when(mockRecord.getInt(any())).thenAnswer((Answer<Integer>) invocation -> current);
+    }
+
+    @Override
+    public boolean start() {
+      //  simulate the deserialization of a tablet scanner
+      if (source.serializedToken != null) {
+        ByteBuffer bb = ByteBuffer.wrap(source.serializedToken);
+        lowerInclusive = bb.getInt();
+        upperExclusive = bb.getInt();
+        LOG.debug(String.format(LOG_SET_RANGE, lowerInclusive, upperExclusive));
+      }
+      current = lowerInclusive;
+      return true;
+    }
+
+    @Override
+    public boolean advance() {
+      current++;
+      return current < upperExclusive;
+    }
+
+    @Override
+    public Integer getCurrent() {
+      return source.spec.getParseFn().apply(mockRecord);
+    }
+
+    @Override
+    public void close() {}
+
+    @Override
+    public BoundedSource<Integer> getCurrentSource() {
+      return source;
+    }
+  }
+
+  // required to be a static class for serialization
+  static class FakeReaderAnswer implements Answer<FakeReader>, Serializable {
+    @Override
+    public FakeReader answer(InvocationOnMock invocation) {
+      Object[] args = invocation.getArguments();
+      return new FakeReader((KuduIO.KuduSource<Integer>) args[0]);
+    }
+  }
+}
diff --git a/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduTestUtils.java b/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduTestUtils.java
new file mode 100644
index 00000000000..285f1994c9a
--- /dev/null
+++ b/sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduTestUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kudu;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.Upsert;
+
+/** Utilities for Kudu tests. */
+class KuduTestUtils {
+  static final String COL_ID = "id";
+  static final String COL_NAME = "name";
+
+  static final Schema SCHEMA =
+      new Schema(
+          ImmutableList.of(
+              new ColumnSchema.ColumnSchemaBuilder(COL_ID, Type.INT64).key(true).build(),
+              new ColumnSchema.ColumnSchemaBuilder(COL_NAME, Type.STRING)
+                  .nullable(false)
+                  .desiredBlockSize(4096)
+                  .encoding(ColumnSchema.Encoding.PLAIN_ENCODING)
+                  .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.NO_COMPRESSION)
+                  .build()));
+
+  static CreateTableOptions createTableOptions() {
+    return new CreateTableOptions()
+        .setRangePartitionColumns(ImmutableList.of(COL_ID))
+        .setNumReplicas(1);
+  }
+
+  /** Creates an Upsert Operation that matches the schema for each input. */
+  static class GenerateUpsert implements KuduIO.FormatFunction<Long> {
+    @Override
+    public Operation apply(TableAndRecord<Long> input) {
+      Upsert upsert = input.getTable().newUpsert();
+      PartialRow row = upsert.getRow();
+      row.addLong(COL_ID, input.getRecord());
+      row.addString(COL_NAME, input.getRecord() + ": name");
+      return upsert;
+    }
+  }
+
+  /** Returns the count of rows for the given table. */
+  static int rowCount(KuduTable table) throws KuduException {
+    KuduScanner scanner = table.getAsyncClient().syncClient().newScannerBuilder(table).build();
+    try {
+      int rowCount = 0;
+      while (scanner.hasMoreRows()) {
+        rowCount += scanner.nextRows().getNumRows();
+      }
+      return rowCount;
+    } finally {
+      scanner.close();
+    }
+  }
+}
diff --git a/sdks/java/io/kudu/src/test/resources/log4j-test.properties b/sdks/java/io/kudu/src/test/resources/log4j-test.properties
new file mode 100644
index 00000000000..4c74d85d7c6
--- /dev/null
+++ b/sdks/java/io/kudu/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/settings.gradle b/settings.gradle
index 39d62d0ee89..01ea889a542 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -138,6 +138,8 @@ include "beam-sdks-java-io-kafka"
 project(":beam-sdks-java-io-kafka").dir = file("sdks/java/io/kafka")
 include "beam-sdks-java-io-kinesis"
 project(":beam-sdks-java-io-kinesis").dir = file("sdks/java/io/kinesis")
+include "beam-sdks-java-io-kudu"
+project(":beam-sdks-java-io-kudu").dir = file("sdks/java/io/kudu")
 include "beam-sdks-java-io-mongodb"
 project(":beam-sdks-java-io-mongodb").dir = file("sdks/java/io/mongodb")
 include "beam-sdks-java-io-mqtt"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 129456)
    Time Spent: 4.5h  (was: 4h 20m)

> Add KuduIO
> ----------
>
>                 Key: BEAM-2661
>                 URL: https://issues.apache.org/jira/browse/BEAM-2661
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-ideas
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Tim Robertson
>            Priority: Major
>          Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> New IO for Apache Kudu ([https://kudu.apache.org/overview.html]).
> This work is in progress [on this branch|https://github.com/timrobertson100/beam/tree/BEAM-2661-KuduIO] with design aspects documented below.
> h2. The API
> The {{KuduIO}} API requires the user to provide a function to convert objects into operations. This is similar to the {{JdbcIO}} but different to others, such as {{HBaseIO}} which requires a pre-transform stage beforehand to convert into the mutations to apply. It was originally intended to copy the {{HBaseIO}} approach, but this was not possible:
>  # The Kudu [Operation|https://kudu.apache.org/apidocs/org/apache/kudu/client/Operation.html] is a fat class, and is a subclass of {{KuduRpc<OperationResponse>}}. It holds RPC logic, callbacks and a Kudu client. Because of this the {{Operation}} does not serialize and furthermore, the logic for encoding the operations (Insert, Upsert etc) in the Kudu Java API are one way only (no decode) because the server is written in C++.
>  # An alternative could be to introduce a new object to beam (e.g. {{o.a.b.sdk.io.kudu.KuduOperation}}) to enable {{PCollection<KuduOperation>}}. This was considered but was discounted because:
>  ## It is not a familiar API to those already knowing Kudu
>  ## It still requires serialization and deserialization of the operations. Using the existing Kudu approach of serializing into compact byte arrays would require a decoder along the lines of [this almost complete example|https://gist.github.com/timrobertson100/df77d1337ba8f5609319751ee7c6e01e]. This is possible but has fragilities given the Kudu code itself continues to evolve. 
>  ## It becomes a trivial codebase in Beam to maintain by defer the object to mutation mapping to within the KuduIO transform. {{JdbcIO}} gives us the precedent to do this.
> h2. Testing framework
> {{Kudu}} is written in C++. While a [TestMiniKuduCluster|https://github.com/cloudera/kudu/blob/master/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java] does exist in Java, it requires binaries to be available for the target environment which is not portable (edit: this is now a [work in progress|https://issues.apache.org/jira/browse/KUDU-2411] in Kudu). Therefore we opt for the following:
>  # Unit tests will use a mock Kudu client
>  # Integration tests will cover the full aspects of the {{KuduIO}} and use a Docker based Kudu instance



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message