beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: Add ReadTranslator
Date Tue, 23 May 2017 01:46:52 GMT
Repository: beam
Updated Branches:
  refs/heads/master 3c10c0bc8 -> a32bef96d


Add ReadTranslator

This translates Read transforms to ReadPayloads and back


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7f35c98b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7f35c98b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7f35c98b

Branch: refs/heads/master
Commit: 7f35c98b5f77069bd21dc7dea4a7d046883d13a6
Parents: b633abe
Author: Thomas Groh <tgroh@google.com>
Authored: Thu May 18 10:23:35 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Mon May 22 16:04:27 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ReadTranslator.java       | 127 +++++++++++++
 .../core/construction/ReadTranslatorTest.java   | 179 +++++++++++++++++++
 .../beam/runners/dataflow/ReadTranslator.java   |   5 +-
 .../org/apache/beam/sdk/io/CountingSource.java  |  42 +++++
 4 files changed, 350 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7f35c98b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java
new file mode 100644
index 0000000..f944938
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.IsBounded;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.SerializableUtils;
+
+/**
+ * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded}
+ * {@link PTransform PTransforms} into {@link ReadPayload} protos.
+ */
+public class ReadTranslator {
+  private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1";
+  private static final String JAVA_SERIALIZED_UNBOUNDED_SOURCE = "urn:beam:java:unboundedsource:v1";
+
+  public static ReadPayload toProto(Read.Bounded<?> read) {
+    return ReadPayload.newBuilder()
+        .setIsBounded(IsBounded.BOUNDED)
+        .setSource(toProto(read.getSource()))
+        .build();
+  }
+
+  public static ReadPayload toProto(Read.Unbounded<?> read) {
+    return ReadPayload.newBuilder()
+        .setIsBounded(IsBounded.UNBOUNDED)
+        .setSource(toProto(read.getSource()))
+        .build();
+  }
+
+  public static SdkFunctionSpec toProto(Source<?> source) {
+    if (source instanceof BoundedSource) {
+      return toProto((BoundedSource) source);
+    } else if (source instanceof UnboundedSource) {
+      return toProto((UnboundedSource<?, ?>) source);
+    } else {
+      throw new IllegalArgumentException(
+          String.format("Unknown %s type %s", Source.class.getSimpleName(), source.getClass()));
+    }
+  }
+
+  private static SdkFunctionSpec toProto(BoundedSource<?> source) {
+    return SdkFunctionSpec.newBuilder()
+        .setSpec(
+            FunctionSpec.newBuilder()
+                .setUrn(JAVA_SERIALIZED_BOUNDED_SOURCE)
+                .setParameter(
+                    Any.pack(
+                        BytesValue.newBuilder()
+                            .setValue(
+                                ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
+                            .build())))
+        .build();
+  }
+
+  public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload)
+      throws InvalidProtocolBufferException {
+    checkArgument(payload.getIsBounded().equals(IsBounded.BOUNDED));
+    return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray(
+        payload
+            .getSource()
+            .getSpec()
+            .getParameter()
+            .unpack(BytesValue.class)
+            .getValue()
+            .toByteArray(),
+        "BoundedSource");
+  }
+
+  private static SdkFunctionSpec toProto(UnboundedSource<?, ?> source) {
+    return SdkFunctionSpec.newBuilder()
+        .setSpec(
+            FunctionSpec.newBuilder()
+                .setUrn(JAVA_SERIALIZED_UNBOUNDED_SOURCE)
+                .setParameter(
+                    Any.pack(
+                        BytesValue.newBuilder()
+                            .setValue(
+                                ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
+                            .build())))
+        .build();
+  }
+
+  public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload payload)
+      throws InvalidProtocolBufferException {
+    checkArgument(payload.getIsBounded().equals(IsBounded.UNBOUNDED));
+    return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray(
+        payload
+            .getSource()
+            .getSpec()
+            .getParameter()
+            .unpack(BytesValue.class)
+            .getValue()
+            .toByteArray(),
+        "BoundedSource");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7f35c98b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java
new file mode 100644
index 0000000..a603e34
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assume.assumeThat;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests for {@link ReadTranslator}.
+ */
+@RunWith(Parameterized.class)
+public class ReadTranslatorTest {
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<Source<?>> data() {
+    return ImmutableList.<Source<?>>of(
+        CountingSource.unbounded(),
+        CountingSource.upTo(100L),
+        new TestBoundedSource(),
+        new TestUnboundedSource());
+  }
+
+  @Parameter(0)
+  public Source<?> source;
+
+  @Test
+  public void testToFromProtoBounded() throws Exception {
+    // TODO: Split into two tests.
+    assumeThat(source, instanceOf(BoundedSource.class));
+    BoundedSource<?> boundedSource = (BoundedSource<?>) this.source;
+    Read.Bounded<?> boundedRead = Read.from(boundedSource);
+    ReadPayload payload = ReadTranslator.toProto(boundedRead);
+    assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.BOUNDED));
+    BoundedSource<?> deserializedSource = ReadTranslator.boundedSourceFromProto(payload);
+    assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source));
+  }
+
+  @Test
+  public void testToFromProtoUnbounded() throws Exception {
+    assumeThat(source, instanceOf(UnboundedSource.class));
+    UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) this.source;
+    Read.Unbounded<?> unboundedRead = Read.from(unboundedSource);
+    ReadPayload payload = ReadTranslator.toProto(unboundedRead);
+    assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.UNBOUNDED));
+    UnboundedSource<?, ?> deserializedSource = ReadTranslator.unboundedSourceFromProto(payload);
+    assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source));
+  }
+
+  private static class TestBoundedSource extends BoundedSource<String> {
+    @Override
+    public List<? extends BoundedSource<String>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public BoundedReader<String> createReader(PipelineOptions options) throws IOException
{
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void validate() {}
+
+    @Override
+    public Coder<String> getDefaultOutputCoder() {
+      return StringUtf8Coder.of();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return other != null && other.getClass().equals(TestBoundedSource.class);
+    }
+
+    @Override
+    public int hashCode() {
+      return TestBoundedSource.class.hashCode();
+    }
+  }
+
+  private static class TestUnboundedSource extends UnboundedSource<byte[], CheckpointMark>
{
+    @Override
+    public void validate() {}
+
+    @Override
+    public Coder<byte[]> getDefaultOutputCoder() {
+      return ByteArrayCoder.of();
+    }
+
+    @Override
+    public List<? extends UnboundedSource<byte[], CheckpointMark>> split(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public UnboundedReader<byte[]> createReader(
+        PipelineOptions options, @Nullable CheckpointMark checkpointMark) throws IOException
{
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Coder<CheckpointMark> getCheckpointMarkCoder() {
+      return new TestCheckpointMarkCoder();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return other != null && other.getClass().equals(TestUnboundedSource.class);
+    }
+
+    @Override
+    public int hashCode() {
+      return TestUnboundedSource.class.hashCode();
+    }
+
+    private class TestCheckpointMarkCoder extends AtomicCoder<CheckpointMark> {
+      @Override
+      public void encode(CheckpointMark value, OutputStream outStream)
+          throws CoderException, IOException {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public CheckpointMark decode(InputStream inStream) throws CoderException, IOException
{
+        throw new UnsupportedOperationException();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7f35c98b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
index 30ecbf5..0b22d7e 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
@@ -40,9 +40,8 @@ class ReadTranslator implements TransformTranslator<Read.Bounded<?>>
{
     translateReadHelper(transform.getSource(), transform, context);
   }
 
-  public static <T> void translateReadHelper(Source<T> source,
-      PTransform<?, ? extends PValue> transform,
-      TranslationContext context) {
+  public static <T> void translateReadHelper(
+      Source<T> source, PTransform<?, ? extends PValue> transform, TranslationContext
context) {
     try {
       StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
       stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);

http://git-wip-us.apache.org/repos/asf/beam/blob/7f35c98b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index 81082e5..6202c2b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.DefaultCoder;
@@ -136,6 +137,16 @@ public class CountingSource {
     public Instant apply(Long input) {
       return Instant.now();
     }
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof NowTimestampFn;
+    }
+
+    @Override
+    public int hashCode() {
+      return getClass().hashCode();
+    }
   }
 
   /**
@@ -180,6 +191,21 @@ public class CountingSource {
     public Coder<Long> getDefaultOutputCoder() {
       return VarLongCoder.of();
     }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof BoundedCountingSource)) {
+        return false;
+      }
+      BoundedCountingSource that = (BoundedCountingSource) other;
+      return this.getStartOffset() == that.getStartOffset()
+          && this.getEndOffset() == that.getEndOffset();
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(this.getStartOffset(), (int) this.getEndOffset());
+    }
   }
 
   /**
@@ -341,6 +367,22 @@ public class CountingSource {
     public Coder<Long> getDefaultOutputCoder() {
       return VarLongCoder.of();
     }
+
+    public boolean equals(Object other) {
+      if (!(other instanceof UnboundedCountingSource)) {
+        return false;
+      }
+      UnboundedCountingSource that = (UnboundedCountingSource) other;
+      return this.start == that.start
+          && this.stride == that.stride
+          && this.elementsPerPeriod == that.elementsPerPeriod
+          && Objects.equals(this.period, that.period)
+          && Objects.equals(this.timestampFn, that.timestampFn);
+    }
+
+    public int hashCode() {
+      return Objects.hash(start, stride, elementsPerPeriod, period, timestampFn);
+    }
   }
 
   /**


Mime
View raw message