beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/3] incubator-beam git commit: Remove inheritance from Create.TimestampedValues
Date Tue, 24 May 2016 20:43:13 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 65db44ce6 -> 5535fc3fd


Remove inheritance from Create.TimestampedValues

Previously, Create.TimestampedValues extends Create.Values. This
actually resulted in confusing behavior in one runner because
Create.Values was overridden using `instanceof` checks, which
accidentally pulled in Create.TimestampedValues.

Now Create.TimeStampedValues is a simple composite transform.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f0e12587
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f0e12587
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f0e12587

Branch: refs/heads/master
Commit: f0e125871d9ca6da9c8597a2216c3b44b9e85345
Parents: dc98211
Author: Kenneth Knowles <klk@google.com>
Authored: Thu May 19 20:28:36 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu May 19 20:28:36 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Create.java  | 144 +++++++++++--------
 1 file changed, 81 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0e12587/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 89e9985..0752113 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -263,53 +263,9 @@ public class Create<T> {
     public Coder<T> getDefaultOutputCoder(PInput input) throws CannotProvideCoderException
{
       if (coder.isPresent()) {
         return coder.get();
+      } else {
+        return getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), elems);
       }
-      // First try to deduce a coder using the types of the elements.
-      Class<?> elementClazz = Void.class;
-      for (T elem : elems) {
-        if (elem == null) {
-          continue;
-        }
-        Class<?> clazz = elem.getClass();
-        if (elementClazz.equals(Void.class)) {
-          elementClazz = clazz;
-        } else if (!elementClazz.equals(clazz)) {
-          // Elements are not the same type, require a user-specified coder.
-          throw new CannotProvideCoderException(
-              "Cannot provide coder for Create: The elements are not all of the same class.");
-        }
-      }
-
-      if (elementClazz.getTypeParameters().length == 0) {
-        try {
-          @SuppressWarnings("unchecked") // elementClazz is a wildcard type
-          Coder<T> coder = (Coder<T>) input.getPipeline().getCoderRegistry()
-              .getDefaultCoder(TypeDescriptor.of(elementClazz));
-          return coder;
-        } catch (CannotProvideCoderException exc) {
-          // let the next stage try
-        }
-      }
-
-      // If that fails, try to deduce a coder using the elements themselves
-      Optional<Coder<T>> coder = Optional.absent();
-      for (T elem : elems) {
-        Coder<T> c = input.getPipeline().getCoderRegistry().getDefaultCoder(elem);
-        if (!coder.isPresent()) {
-          coder = Optional.of(c);
-        } else if (!Objects.equals(c, coder.get())) {
-          throw new CannotProvideCoderException(
-              "Cannot provide coder for elements of " + Create.class.getSimpleName() + ":"
-              + " For their common class, no coder could be provided."
-              + " Based on their values, they do not all default to the same Coder.");
-        }
-      }
-
-      if (!coder.isPresent()) {
-        throw new CannotProvideCoderException("Unable to infer a coder. Please register "
-            + "a coder for ");
-      }
-      return coder.get();
     }
 
     /////////////////////////////////////////////////////////////////////////////
@@ -468,7 +424,7 @@ public class Create<T> {
    * A {@code PTransform} that creates a {@code PCollection} whose elements have
    * associated timestamps.
    */
-  public static class TimestampedValues<T> extends Values<T> {
+  public static class TimestampedValues<T> extends PTransform<PInput, PCollection<T>>{
     /**
      * Returns a {@link Create.TimestampedValues} PTransform like this one that uses the
given
      * {@code Coder<T>} to decode each of the objects into a
@@ -482,17 +438,30 @@ public class Create<T> {
      * <p>Note that for {@link Create.TimestampedValues with no elements}, the {@link
VoidCoder}
      * is used.
      */
-    @Override
     public TimestampedValues<T> withCoder(Coder<T> coder) {
-      return new TimestampedValues<>(elems, Optional.<Coder<T>>of(coder));
+      return new TimestampedValues<>(timestampedElements, Optional.<Coder<T>>of(coder));
     }
 
     @Override
     public PCollection<T> apply(PInput input) {
       try {
-        Coder<T> coder = getDefaultOutputCoder(input);
+        Iterable<T> rawElements =
+            Iterables.transform(
+                timestampedElements,
+                new Function<TimestampedValue<T>, T>() {
+                  @Override
+                  public T apply(TimestampedValue<T> input) {
+                    return input.getValue();
+                  }
+                });
+        Coder<T> coder;
+        if (elementCoder.isPresent()) {
+          coder = elementCoder.get();
+        } else {
+          coder = getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), rawElements);
+        }
         PCollection<TimestampedValue<T>> intermediate = Pipeline.applyTransform(input,
-            Create.of(elems).withCoder(TimestampedValueCoder.of(coder)));
+            Create.of(timestampedElements).withCoder(TimestampedValueCoder.of(coder)));
 
         PCollection<T> output = intermediate.apply(ParDo.of(new ConvertTimestamps<T>()));
         output.setCoder(coder);
@@ -506,18 +475,14 @@ public class Create<T> {
     /////////////////////////////////////////////////////////////////////////////
 
     /** The timestamped elements of the resulting PCollection. */
-    private final transient Iterable<TimestampedValue<T>> elems;
-
-    private TimestampedValues(Iterable<TimestampedValue<T>> elems,
-        Optional<Coder<T>> coder) {
-      super(
-          Iterables.transform(elems, new Function<TimestampedValue<T>, T>() {
-            @Override
-            public T apply(TimestampedValue<T> input) {
-              return input.getValue();
-            }
-          }), coder);
-      this.elems = elems;
+    private final transient Iterable<TimestampedValue<T>> timestampedElements;
+
+    private final transient Optional<Coder<T>> elementCoder;
+
+    private TimestampedValues(
+        Iterable<TimestampedValue<T>> timestampedElements, Optional<Coder<T>>
elementCoder) {
+      this.timestampedElements = timestampedElements;
+      this.elementCoder = elementCoder;
     }
 
     private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>,
T> {
@@ -527,4 +492,57 @@ public class Create<T> {
       }
     }
   }
+
+  private static <T> Coder<T> getDefaultCreateCoder(CoderRegistry registry, Iterable<T>
elems)
+      throws CannotProvideCoderException {
+    // First try to deduce a coder using the types of the elements.
+    Class<?> elementClazz = Void.class;
+    for (T elem : elems) {
+      if (elem == null) {
+        continue;
+      }
+      Class<?> clazz = elem.getClass();
+      if (elementClazz.equals(Void.class)) {
+        elementClazz = clazz;
+      } else if (!elementClazz.equals(clazz)) {
+        // Elements are not the same type, require a user-specified coder.
+        throw new CannotProvideCoderException(
+            String.format(
+                "Cannot provide coder for %s: The elements are not all of the same class.",
+                Create.class.getSimpleName()));
+      }
+    }
+
+    if (elementClazz.getTypeParameters().length == 0) {
+      try {
+        @SuppressWarnings("unchecked") // elementClazz is a wildcard type
+        Coder<T> coder = (Coder<T>) registry.getDefaultCoder(TypeDescriptor.of(elementClazz));
+        return coder;
+      } catch (CannotProvideCoderException exc) {
+        // let the next stage try
+      }
+    }
+
+    // If that fails, try to deduce a coder using the elements themselves
+    Optional<Coder<T>> coder = Optional.absent();
+    for (T elem : elems) {
+      Coder<T> c = registry.getDefaultCoder(elem);
+      if (!coder.isPresent()) {
+        coder = Optional.of(c);
+      } else if (!Objects.equals(c, coder.get())) {
+        throw new CannotProvideCoderException(
+            "Cannot provide coder for elements of "
+                + Create.class.getSimpleName()
+                + ":"
+                + " For their common class, no coder could be provided."
+                + " Based on their values, they do not all default to the same Coder.");
+      }
+    }
+
+    if (!coder.isPresent()) {
+      throw new CannotProvideCoderException(
+          "Unable to infer a coder. Please register " + "a coder for ");
+    }
+    return coder.get();
+  }
 }


Mime
View raw message