beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamesmal...@apache.org
Subject [06/50] [abbrv] incubator-beam git commit: Encode elements in InProcessCreate
Date Fri, 26 Feb 2016 22:54:43 GMT
Encode elements in InProcessCreate

There is no requirement that the elements of a Create are seralizable -
however, they must be encodeable; because applying a read will ensure
the elements are seralizable, we must ensure that all elements succeed;
do so via encoding all of the available elements into the source, and
decoding them in the reader.

Implement getEstimatedSizeBytes and splitIntoBundles, now that we know
the size of the elements in bytes.

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115256859


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/209364e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/209364e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/209364e6

Branch: refs/heads/master
Commit: 209364e6ac6890845c9a51253e6c97b31ad2191c
Parents: d58b7db
Author: tgroh <tgroh@google.com>
Authored: Mon Feb 22 12:21:09 2016 -0800
Committer: Davor Bonaci <davorbonaci@users.noreply.github.com>
Committed: Thu Feb 25 23:58:25 2016 -0800

----------------------------------------------------------------------
 .../sdk/runners/inprocess/InProcessCreate.java  | 154 ++++++++++++-------
 .../runners/inprocess/InProcessCreateTest.java  | 126 +++++++++++++++
 2 files changed, 223 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/209364e6/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
index 0ff881f..9023b7b 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
@@ -18,35 +18,39 @@ package com.google.cloud.dataflow.sdk.runners.inprocess;
 import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
 import com.google.cloud.dataflow.sdk.io.BoundedSource;
 import com.google.cloud.dataflow.sdk.io.Read;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.Create.Values;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.CoderUtils;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.NoSuchElementException;
 
+import javax.annotation.Nullable;
+
 /**
- * An in memory implementation of the {@link Values Create.Values} {@link PTransform}, implemented'
+ * An in-process implementation of the {@link Values Create.Values} {@link PTransform}, implemented
  * using a {@link BoundedSource}.
  *
  * The coder is inferred via the {@link Values#getDefaultOutputCoder(PInput)} method on the
original
  * transform.
  */
-class InProcessCreate<T> extends PTransform<PInput, PCollection<T>> {
+class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>>
{
   private final Create.Values<T> original;
-  private final InMemorySource<T> source;
 
   public static <T> InProcessCreate<T> from(Create.Values<T> original)
{
     return new InProcessCreate<>(original);
@@ -54,38 +58,88 @@ class InProcessCreate<T> extends PTransform<PInput, PCollection<T>>
{
 
   private InProcessCreate(Values<T> original) {
     this.original = original;
-    this.source = new InMemorySource<>(original.getElements());
   }
 
   @Override
   public PCollection<T> apply(PInput input) {
-    input.getPipeline().getCoderRegistry();
-    PCollection<T> result = input.getPipeline().apply(Read.from(source));
+    Coder<T> elementCoder;
     try {
-      result.setCoder(original.getDefaultOutputCoder(input));
+      elementCoder = original.getDefaultOutputCoder(input);
     } catch (CannotProvideCoderException e) {
-      throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified.
"
-          + "Please set a coder by invoking Create.withCoder() explicitly.", e);
+      throw new IllegalArgumentException(
+          "Unable to infer a coder and no Coder was specified. "
+          + "Please set a coder by invoking Create.withCoder() explicitly.",
+          e);
+    }
+    InMemorySource<T> source;
+    try {
+      source = new InMemorySource<>(original.getElements(), elementCoder);
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
     }
+    PCollection<T> result = input.getPipeline().apply(Read.from(source));
+    result.setCoder(elementCoder);
     return result;
   }
 
-  private static class InMemorySource<T> extends BoundedSource<T> {
-    private final Iterable<T> elements;
+  @Override
+  public PTransform<PInput, PCollection<T>> delegate() {
+    return original;
+  }
 
-    public InMemorySource(Iterable<T> elements) {
-      this.elements = elements;
+  @VisibleForTesting
+  static class InMemorySource<T> extends BoundedSource<T> {
+    private final Collection<byte[]> allElementsBytes;
+    private final long totalSize;
+    private final Coder<T> coder;
+
+    public InMemorySource(Iterable<T> elements, Coder<T> elemCoder)
+        throws CoderException, IOException {
+      allElementsBytes = new ArrayList<>();
+      long totalSize = 0L;
+      for (T element : elements) {
+        byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
+        allElementsBytes.add(bytes);
+        totalSize += bytes.length;
+      }
+      this.totalSize = totalSize;
+      this.coder = elemCoder;
+    }
+
+    /**
+     * Create a new source with the specified bytes. The new source owns the input element
bytes,
+     * which must not be modified after this constructor is called.
+     */
+    private InMemorySource(Collection<byte[]> elementBytes, long totalSize, Coder<T>
coder) {
+      this.allElementsBytes = ImmutableList.copyOf(elementBytes);
+      this.totalSize = totalSize;
+      this.coder = coder;
     }
 
     @Override
     public List<? extends BoundedSource<T>> splitIntoBundles(
         long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-      return Collections.singletonList(this);
+      ImmutableList.Builder<InMemorySource<T>> resultBuilder = ImmutableList.builder();
+      long currentSourceSize = 0L;
+      List<byte[]> currentElems = new ArrayList<>();
+      for (byte[] elemBytes : allElementsBytes) {
+        currentElems.add(elemBytes);
+        currentSourceSize += elemBytes.length;
+        if (currentSourceSize >= desiredBundleSizeBytes) {
+          resultBuilder.add(new InMemorySource<>(currentElems, currentSourceSize, coder));
+          currentElems.clear();
+          currentSourceSize = 0L;
+        }
+      }
+      if (!currentElems.isEmpty()) {
+        resultBuilder.add(new InMemorySource<>(currentElems, currentSourceSize, coder));
+      }
+      return resultBuilder.build();
     }
 
     @Override
     public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      return 0L;
+      return totalSize;
     }
 
     @Override
@@ -95,7 +149,7 @@ class InProcessCreate<T> extends PTransform<PInput, PCollection<T>>
{
 
     @Override
     public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws
IOException {
-      return new IterableReader();
+      return new BytesReader();
     }
 
     @Override
@@ -103,42 +157,19 @@ class InProcessCreate<T> extends PTransform<PInput, PCollection<T>>
{
 
     @Override
     public Coder<T> getDefaultOutputCoder() {
-      // Return a coder that exclusively throws exceptions. The coder is set properly in
apply, or
-      // an illegal argument exception is thrown.
-      return new StandardCoder<T>() {
-        @Override
-        public void encode(T value, OutputStream outStream,
-            com.google.cloud.dataflow.sdk.coders.Coder.Context context)
-            throws CoderException, IOException {
-          throw new CoderException("Default Create Coder cannot be used");
-        }
-
-        @Override
-        public T decode(
-            InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
-            throws CoderException, IOException {
-          throw new CoderException("Default Create Coder cannot be used");
-        }
-
-        @Override
-        public List<? extends Coder<?>> getCoderArguments() {
-          return Collections.emptyList();
-        }
-
-        @Override
-        public void verifyDeterministic()
-            throws com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException {
-          throw new NonDeterministicException(
-              this, Collections.<String>singletonList("Default Create Coder cannot
be used"));
-        }
-      };
+      return coder;
     }
 
-    private class IterableReader extends BoundedReader<T> {
-      private final PeekingIterator<T> iter;
+    private class BytesReader extends BoundedReader<T> {
+      private final PeekingIterator<byte[]> iter;
+      /**
+       * Use an optional to distinguish between null next element (as Optional.absent())
and no next
+       * element (next is null).
+       */
+      @Nullable private Optional<T> next;
 
-      public IterableReader() {
-        this.iter = Iterators.peekingIterator(elements.iterator());
+      public BytesReader() {
+        this.iter = Iterators.peekingIterator(allElementsBytes.iterator());
       }
 
       @Override
@@ -148,18 +179,27 @@ class InProcessCreate<T> extends PTransform<PInput, PCollection<T>>
{
 
       @Override
       public boolean start() throws IOException {
-        return iter.hasNext();
+        return advance();
       }
 
       @Override
       public boolean advance() throws IOException {
-        iter.next();
-        return iter.hasNext();
+        boolean hasNext = iter.hasNext();
+        if (hasNext) {
+          next = Optional.fromNullable(CoderUtils.decodeFromByteArray(coder, iter.next()));
+        } else {
+          next = null;
+        }
+        return hasNext;
       }
 
       @Override
+      @Nullable
       public T getCurrent() throws NoSuchElementException {
-        return iter.peek();
+        if (next == null) {
+          throw new NoSuchElementException();
+        }
+        return next.orNull();
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/209364e6/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java
index 31deb71..4db014e 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreateTest.java
@@ -15,13 +15,31 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.NullableCoder;
+import com.google.cloud.dataflow.sdk.coders.StandardCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessCreate.InMemorySource;
 import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.SourceTestUtils;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.util.SerializableUtils;
 import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableList;
 
 import org.hamcrest.Matchers;
 import org.junit.Rule;
@@ -30,7 +48,12 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * Tests for {@link InProcessCreate}.
@@ -50,6 +73,19 @@ public class InProcessCreateTest {
     DataflowAssert.that(p.apply(converted)).containsInAnyOrder(2, 1, 3);
   }
 
+  @Test
+  public void testConvertsCreateWithNullElements() {
+    Create.Values<String> og =
+        Create.<String>of("foo", null, "spam", "ham", null, "eggs")
+            .withCoder(NullableCoder.of(StringUtf8Coder.of()));
+
+    InProcessCreate<String> converted = InProcessCreate.from(og);
+    TestPipeline p = TestPipeline.create();
+
+    DataflowAssert.that(p.apply(converted))
+        .containsInAnyOrder(null, "foo", null, "spam", "ham", "eggs");
+  }
+
   static class Record implements Serializable {}
 
   static class Record2 extends Record {}
@@ -70,4 +106,94 @@ public class InProcessCreateTest {
 
     fail("Unexpectedly Inferred Coder " + c.getCoder());
   }
+
+  /**
+   * An unserializable class to demonstrate encoding of elements.
+   */
+  private static class UnserializableRecord {
+    private final String myString;
+
+    private UnserializableRecord(String myString) {
+      this.myString = myString;
+    }
+
+    @Override
+    public int hashCode() {
+      return myString.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return myString.equals(((UnserializableRecord) o).myString);
+    }
+
+    static class UnserializableRecordCoder extends StandardCoder<UnserializableRecord>
{
+      private final Coder<String> stringCoder = StringUtf8Coder.of();
+
+      @Override
+      public void encode(
+          UnserializableRecord value,
+          OutputStream outStream,
+          com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+          throws CoderException, IOException {
+        stringCoder.encode(value.myString, outStream, context.nested());
+      }
+
+      @Override
+      public UnserializableRecord decode(
+          InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+          throws CoderException, IOException {
+        return new UnserializableRecord(stringCoder.decode(inStream, context.nested()));
+      }
+
+      @Override
+      public List<? extends Coder<?>> getCoderArguments() {
+        return Collections.emptyList();
+      }
+
+      @Override
+      public void verifyDeterministic() throws Coder.NonDeterministicException {
+        stringCoder.verifyDeterministic();
+      }
+    }
+  }
+
+  @Test
+  public void testSerializableOnUnserializableElements() throws Exception {
+    List<UnserializableRecord> elements =
+        ImmutableList.of(
+            new UnserializableRecord("foo"),
+            new UnserializableRecord("bar"),
+            new UnserializableRecord("baz"));
+    InMemorySource<UnserializableRecord> source =
+        new InMemorySource<>(elements, new UnserializableRecord.UnserializableRecordCoder());
+    SerializableUtils.ensureSerializable(source);
+  }
+
+  @Test
+  public void testSplitIntoBundles() throws Exception {
+    InProcessCreate.InMemorySource<Integer> source =
+        new InMemorySource<>(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), BigEndianIntegerCoder.of());
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12,
options);
+    assertThat(splitSources, hasSize(3));
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options);
+  }
+
+  @Test
+  public void testDoesNotProduceSortedKeys() throws Exception {
+    InProcessCreate.InMemorySource<String> source =
+        new InMemorySource<>(ImmutableList.of("spam", "ham", "eggs"), StringUtf8Coder.of());
+    assertThat(source.producesSortedKeys(PipelineOptionsFactory.create()), is(false));
+  }
+
+  @Test
+  public void testGetDefaultOutputCoderReturnsConstructorCoder() throws Exception {
+    Coder<Integer> coder = VarIntCoder.of();
+    InProcessCreate.InMemorySource<Integer> source =
+        new InMemorySource<>(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), coder);
+
+    Coder<Integer> defaultCoder = source.getDefaultOutputCoder();
+    assertThat(defaultCoder, equalTo(coder));
+  }
 }


Mime
View raw message