beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [6/6] beam git commit: Add default implementations of Coder methods to Coder
Date Fri, 05 May 2017 20:35:26 GMT
Add default implementations of Coder methods to Coder

Remove from StructuredCoder. These are sensible defaults implemented in
terms of other Coder methods.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63258c69
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63258c69
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63258c69

Branch: refs/heads/master
Commit: 63258c6986866b5bef58043b056d5c0dfec7303f
Parents: 655947b
Author: Thomas Groh <tgroh@google.com>
Authored: Fri May 5 10:10:40 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Fri May 5 13:30:51 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/coders/Coder.java  | 121 ++++++++++++++++---
 .../apache/beam/sdk/coders/StructuredCoder.java |  67 ++--------
 2 files changed, 108 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/63258c69/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index 061e9e5..41e83ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -22,6 +22,9 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.base.Joiner;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -206,6 +209,30 @@ public abstract class Coder<T> implements Serializable {
   public abstract void verifyDeterministic() throws Coder.NonDeterministicException;
 
   /**
+   * Verifies all of the provided coders are deterministic. If any are not, throws a {@link
+   * NonDeterministicException} for the {@code target} {@link Coder}.
+   */
+  public static void verifyDeterministic(Coder<?> target, String message, Iterable<Coder<?>>
coders)
+      throws NonDeterministicException {
+    for (Coder<?> coder : coders) {
+      try {
+        coder.verifyDeterministic();
+      } catch (NonDeterministicException e) {
+        throw new NonDeterministicException(target, message, e);
+      }
+    }
+  }
+
+  /**
+   * Verifies all of the provided coders are deterministic. If any are not, throws a {@link
+   * NonDeterministicException} for the {@code target} {@link Coder}.
+   */
+  public static void verifyDeterministic(Coder<?> target, String message, Coder<?>...
coders)
+      throws NonDeterministicException {
+    verifyDeterministic(target, message, Arrays.asList(coders));
+  }
+
+  /**
    * Returns {@code true} if this {@link Coder} is injective with respect to {@link Objects#equals}.
    *
    * <p>Whenever the encoded bytes of two values are equal, then the original values
are equal
@@ -214,28 +241,50 @@ public abstract class Coder<T> implements Serializable {
    * <p>This condition is most notably false for arrays. More generally, this condition
is false
    * whenever {@code equals()} compares object identity, rather than performing a
    * semantic/structural comparison.
+   *
+   * <p>By default, returns false.
    */
-  public abstract boolean consistentWithEquals();
+  public boolean consistentWithEquals() {
+    return false;
+  }
 
   /**
-   * Returns an object with an {@code Object.equals()} method that represents structural
equality
-   * on the argument.
+   * Returns an object with an {@code Object.equals()} method that represents structural
equality on
+   * the argument.
    *
    * <p>For any two values {@code x} and {@code y} of type {@code T}, if their encoded
bytes are the
    * same, then it must be the case that {@code structuralValue(x).equals(@code structuralValue(y)}.
    *
    * <p>Most notably:
+   *
    * <ul>
    *   <li>The structural value for an array coder should perform a structural comparison
of the
-   *   contents of the arrays, rather than the default behavior of comparing according to
object
-   *   identity.
-   *   <li>The structural value for a coder accepting {@code null} should be a proper
object with
-   *   an {@code equals()} method, even if the input value is {@code null}.
+   *       contents of the arrays, rather than the default behavior of comparing according
to object
+   *       identity.
+   *   <li>The structural value for a coder accepting {@code null} should be a proper
object with an
+   *       {@code equals()} method, even if the input value is {@code null}.
    * </ul>
    *
    * <p>See also {@link #consistentWithEquals()}.
+   *
+   * <p>By default, if this coder is {@link #consistentWithEquals()}, and the value
is not null,
+   * returns the provided object. Otherwise, encodes the value into a {@code byte[]}, and
returns
+   * an object that performs array equality on the encoded bytes.
    */
-  public abstract Object structuralValue(T value);
+  public Object structuralValue(T value) {
+    if (value != null && consistentWithEquals()) {
+      return value;
+    } else {
+      try {
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        encode(value, os, Context.OUTER);
+        return new StructuralByteArray(os.toByteArray());
+      } catch (Exception exn) {
+        throw new IllegalArgumentException(
+            "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
+      }
+    }
+  }
 
   /**
    * Returns whether {@link #registerByteSizeObserver} cheap enough to
@@ -246,21 +295,44 @@ public abstract class Coder<T> implements Serializable {
    * <p>Not intended to be called by user code, but instead by
    * {@link PipelineRunner}
    * implementations.
+   *
+   * <p>By default, returns false. The default {@link #registerByteSizeObserver} implementation
+   *         invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
+   *         unless it is overridden. This is considered expensive.
    */
-  public abstract boolean isRegisterByteSizeObserverCheap(T value);
+  public boolean isRegisterByteSizeObserverCheap(T value) {
+    return isRegisterByteSizeObserverCheap(value, Context.NESTED);
+  }
 
   /**
-   * Returns whether {@link #registerByteSizeObserver} cheap enough to
-   * call for every element, that is, if this {@code Coder} can
-   * calculate the byte size of the element to be coded in roughly
-   * constant time (or lazily).
+   * {@inheritDoc}
    *
    * <p>Not intended to be called by user code, but instead by
    * {@link PipelineRunner}
    * implementations.
+   *
+   * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver}
+   *         invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
+   *         unless it is overridden. This is considered expensive.
    */
   @Deprecated
-  public abstract boolean isRegisterByteSizeObserverCheap(T value, Context context);
+  public boolean isRegisterByteSizeObserverCheap(T value, Context context) {
+    return false;
+  }
+
+  /**
+   * Returns the size in bytes of the encoded value using this coder.
+   */
+  protected long getEncodedElementByteSize(T value, Context context)
+      throws Exception {
+    try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream()))
{
+      encode(value, os, context);
+      return os.getCount();
+    } catch (Exception exn) {
+      throw new IllegalArgumentException(
+          "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
+    }
+  }
 
   /**
    * Notifies the {@code ElementByteSizeObserver} about the byte size
@@ -269,10 +341,14 @@ public abstract class Coder<T> implements Serializable {
    * <p>Not intended to be called by user code, but instead by
    * {@link PipelineRunner}
    * implementations.
+   *
+   * <p>By default, this notifies {@code observer} about the byte size
+   * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}.
    */
-  public abstract void registerByteSizeObserver(
-      T value, ElementByteSizeObserver observer)
-      throws Exception;
+  public void registerByteSizeObserver(T value, ElementByteSizeObserver observer)
+      throws Exception {
+    registerByteSizeObserver(value, observer, Context.NESTED);
+  }
 
   /**
    * Notifies the {@code ElementByteSizeObserver} about the byte size
@@ -283,15 +359,20 @@ public abstract class Coder<T> implements Serializable {
    * implementations.
    */
   @Deprecated
-  public abstract void registerByteSizeObserver(
+  public void registerByteSizeObserver(
       T value, ElementByteSizeObserver observer, Context context)
-      throws Exception;
+      throws Exception {
+    observer.update(getEncodedElementByteSize(value, context));
+  }
 
   /**
    * Returns the {@link TypeDescriptor} for the type encoded.
    */
   @Experimental(Kind.CODER_TYPE_ENCODING)
-  public abstract TypeDescriptor<T> getEncodedTypeDescriptor();
+  public TypeDescriptor<T> getEncodedTypeDescriptor(){
+    return (TypeDescriptor<T>)
+        TypeDescriptor.of(getClass()).resolveType(new TypeDescriptor<T>() {}.getType());
+  }
 
   /**
    * Exception thrown by {@link Coder#verifyDeterministic()} if the encoding is

http://git-wip-us.apache.org/repos/asf/beam/blob/63258c69/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
index 0c72618..437f10d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -26,13 +24,15 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * An abstract base class to implement a {@link Coder} that defines equality, hashing, and
printing
  * via the class name and recursively using {@link #getComponents}.
  *
+ * <p>A {@link StructuredCoder} should be defined purely in terms of its component
coders, and
+ * contain no additional configuration.
+ *
  * <p>To extend {@link StructuredCoder}, override the following methods as appropriate:
  *
  * <ul>
@@ -101,12 +101,14 @@ public abstract class StructuredCoder<T> extends Coder<T>
{
     return builder.toString();
   }
 
+  @Override
   public void encode(T value, OutputStream outStream)
       throws CoderException, IOException {
     encode(value, outStream, Coder.Context.NESTED);
   }
 
   @Deprecated
+  @Override
   public void encodeOuter(T value, OutputStream outStream)
       throws CoderException, IOException {
     encode(value, outStream, Coder.Context.OUTER);
@@ -122,11 +124,13 @@ public abstract class StructuredCoder<T> extends Coder<T>
{
     }
   }
 
+  @Override
   public T decode(InputStream inStream) throws CoderException, IOException {
     return decode(inStream, Coder.Context.NESTED);
   }
 
   @Deprecated
+  @Override
   public T decodeOuter(InputStream inStream) throws CoderException, IOException {
     return decode(inStream, Coder.Context.OUTER);
   }
@@ -141,63 +145,6 @@ public abstract class StructuredCoder<T> extends Coder<T>
{
     }
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver}
-   *         invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
-   *         unless it is overridden. This is considered expensive.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(T value) {
-    return isRegisterByteSizeObserverCheap(value, Context.NESTED);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver}
-   *         invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
-   *         unless it is overridden. This is considered expensive.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(T value, Context context) {
-    return false;
-  }
-
-  /**
-   * Returns the size in bytes of the encoded value using this coder.
-   */
-  protected long getEncodedElementByteSize(T value, Context context)
-      throws Exception {
-    try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream()))
{
-      encode(value, os, context);
-      return os.getCount();
-    } catch (Exception exn) {
-      throw new IllegalArgumentException(
-          "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
-    }
-  }
-
-  @Override
-  public void registerByteSizeObserver(T value, ElementByteSizeObserver observer)
-      throws Exception {
-    registerByteSizeObserver(value, observer, Context.NESTED);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>For {@link StructuredCoder} subclasses, this notifies {@code observer} about
the byte size
-   * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}.
-   */
-  @Override
-  public void registerByteSizeObserver(
-      T value, ElementByteSizeObserver observer, Context context)
-      throws Exception {
-    observer.update(getEncodedElementByteSize(value, context));
-  }
-
   protected void verifyDeterministic(String message, Iterable<Coder<?>> coders)
       throws NonDeterministicException {
     for (Coder<?> coder : coders) {


Mime
View raw message