beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/3] incubator-beam git commit: Make WindowedValue like an interface, allow external implementations
Date Fri, 01 Jul 2016 02:51:42 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 61b9d723d -> 1a060f684


Make WindowedValue like an interface, allow external implementations


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d835317c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d835317c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d835317c

Branch: refs/heads/master
Commit: d835317c230087a056fecaaff1d2e8ea8c910c23
Parents: bd21ead
Author: Kenneth Knowles <klk@google.com>
Authored: Wed Jun 22 07:40:38 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Fri Jun 24 14:47:25 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/util/WindowedValue.java | 181 ++++++++++++-------
 1 file changed, 112 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d835317c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 1bbdbd9..f63a0d4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -60,9 +60,6 @@ import java.util.Set;
  */
 public abstract class WindowedValue<T> {
 
-  protected final T value;
-  protected final PaneInfo pane;
-
   /**
    * Returns a {@code WindowedValue} with the given value, timestamp,
    * and windows.
@@ -147,11 +144,6 @@ public abstract class WindowedValue<T> {
     return new ValueInEmptyWindows<T>(value, pane);
   }
 
-  private WindowedValue(T value, PaneInfo pane) {
-    this.value = value;
-    this.pane = checkNotNull(pane);
-  }
-
   /**
    * Returns a new {@code WindowedValue} that is a copy of this one, but with a different
value,
    * which may have a new type {@code NewT}.
@@ -161,9 +153,7 @@ public abstract class WindowedValue<T> {
   /**
    * Returns the value of this {@code WindowedValue}.
    */
-  public T getValue() {
-    return value;
-  }
+  public abstract T getValue();
 
   /**
    * Returns the timestamp of this {@code WindowedValue}.
@@ -176,6 +166,11 @@ public abstract class WindowedValue<T> {
   public abstract Collection<? extends BoundedWindow> getWindows();
 
   /**
+   * Returns the pane of this {@code WindowedValue} in its window.
+   */
+  public abstract PaneInfo getPane();
+
+  /**
    * Returns a collection of {@link WindowedValue WindowedValues} identical to this one,
except each
    * is in exactly one of the windows that this {@link WindowedValue} is in.
    */
@@ -187,18 +182,28 @@ public abstract class WindowedValue<T> {
     return windowedValues.build();
   }
 
-  /**
-   * Returns the pane of this {@code WindowedValue} in its window.
-   */
-  public PaneInfo getPane() {
-    return pane;
-  }
-
   @Override
-  public abstract boolean equals(Object o);
+  public boolean equals(Object other) {
+    if (!(other instanceof WindowedValue)) {
+      return false;
+    } else {
+      WindowedValue<?> that = (WindowedValue<?>) other;
+
+      // Compare timestamps first as they are most likely to differ.
+      // Also compare timestamps according to millis-since-epoch because otherwise expensive
+      // comparisons are made on their Chronology objects.
+      return this.getTimestamp().isEqual(that.getTimestamp())
+          && Objects.equals(this.getValue(), that.getValue())
+          && Objects.equals(this.getWindows(), that.getWindows())
+          && Objects.equals(this.getPane(), that.getPane());
+    }
+  }
 
   @Override
-  public abstract int hashCode();
+  public int hashCode() {
+    // Hash only the millis of the timestamp to be consistent with equals
+    return Objects.hash(getValue(), getTimestamp().getMillis(), getWindows(), getPane());
+  }
 
   @Override
   public abstract String toString();
@@ -207,11 +212,34 @@ public abstract class WindowedValue<T> {
       Collections.singletonList(GlobalWindow.INSTANCE);
 
   /**
+   * An abstract superclass for implementations of {@link WindowedValue} that stores the
value
+   * and pane info.
+   */
+  private abstract static class SimpleWindowedValue<T> extends WindowedValue<T>
{
+    private final T value;
+    private final PaneInfo pane;
+
+    protected SimpleWindowedValue(T value, PaneInfo pane) {
+      this.value = value;
+      this.pane = checkNotNull(pane);
+    }
+
+    @Override
+    public PaneInfo getPane() {
+      return pane;
+    }
+    @Override
+    public T getValue() {
+      return value;
+    }
+  }
+
+  /**
    * The abstract superclass of WindowedValue representations where
    * timestamp == MIN.
    */
   private abstract static class MinTimestampWindowedValue<T>
-      extends WindowedValue<T> {
+      extends SimpleWindowedValue<T> {
     public MinTimestampWindowedValue(T value, PaneInfo pane) {
       super(value, pane);
     }
@@ -233,8 +261,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public <NewT> WindowedValue<NewT> withValue(NewT value) {
-      return new ValueInGlobalWindow<>(value, pane);
+    public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+      return new ValueInGlobalWindow<>(newValue, getPane());
     }
 
     @Override
@@ -246,23 +274,23 @@ public abstract class WindowedValue<T> {
     public boolean equals(Object o) {
       if (o instanceof ValueInGlobalWindow) {
         ValueInGlobalWindow<?> that = (ValueInGlobalWindow<?>) o;
-        return Objects.equals(that.pane, this.pane)
-            && Objects.equals(that.value, this.value);
+        return Objects.equals(that.getPane(), this.getPane())
+            && Objects.equals(that.getValue(), this.getValue());
       } else {
-        return false;
+        return super.equals(o);
       }
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(value, pane);
+      return Objects.hash(getValue(), getPane());
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
-          .add("value", value)
-          .add("pane", pane)
+          .add("value", getValue())
+          .add("pane", getPane())
           .toString();
     }
   }
@@ -278,8 +306,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public <NewT> WindowedValue<NewT> withValue(NewT value) {
-      return new ValueInEmptyWindows<>(value, pane);
+    public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+      return new ValueInEmptyWindows<>(newValue, getPane());
     }
 
     @Override
@@ -291,23 +319,23 @@ public abstract class WindowedValue<T> {
     public boolean equals(Object o) {
       if (o instanceof ValueInEmptyWindows) {
         ValueInEmptyWindows<?> that = (ValueInEmptyWindows<?>) o;
-        return Objects.equals(that.pane, this.pane)
-            && Objects.equals(that.value, this.value);
+        return Objects.equals(that.getPane(), this.getPane())
+            && Objects.equals(that.getValue(), this.getValue());
       } else {
-        return false;
+        return super.equals(o);
       }
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(value, pane);
+      return Objects.hash(getValue(), getPane());
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
-          .add("value", value)
-          .add("pane", pane)
+          .add("value", getValue())
+          .add("pane", getPane())
           .toString();
     }
   }
@@ -317,8 +345,8 @@ public abstract class WindowedValue<T> {
    * timestamp is arbitrary.
    */
   private abstract static class TimestampedWindowedValue<T>
-      extends WindowedValue<T> {
-    protected final Instant timestamp;
+      extends SimpleWindowedValue<T> {
+    private final Instant timestamp;
 
     public TimestampedWindowedValue(T value,
                                     Instant timestamp,
@@ -346,8 +374,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public <NewT> WindowedValue<NewT> withValue(NewT value) {
-      return new TimestampedValueInGlobalWindow<>(value, timestamp, pane);
+    public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+      return new TimestampedValueInGlobalWindow<>(newValue, getTimestamp(), getPane());
     }
 
     @Override
@@ -360,25 +388,29 @@ public abstract class WindowedValue<T> {
       if (o instanceof TimestampedValueInGlobalWindow) {
         TimestampedValueInGlobalWindow<?> that =
             (TimestampedValueInGlobalWindow<?>) o;
-        return this.timestamp.isEqual(that.timestamp) // don't compare chronology objects
-            && Objects.equals(that.pane, this.pane)
-            && Objects.equals(that.value, this.value);
+        // Compare timestamps first as they are most likely to differ.
+        // Also compare timestamps according to millis-since-epoch because otherwise expensive
+        // comparisons are made on their Chronology objects.
+        return this.getTimestamp().isEqual(that.getTimestamp())
+            && Objects.equals(that.getPane(), this.getPane())
+            && Objects.equals(that.getValue(), this.getValue());
       } else {
-        return false;
+        return super.equals(o);
       }
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(value, pane, timestamp.getMillis());
+      // Hash only the millis of the timestamp to be consistent with equals
+      return Objects.hash(getValue(), getPane(), getTimestamp().getMillis());
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
-          .add("value", value)
-          .add("timestamp", timestamp)
-          .add("pane", pane)
+          .add("value", getValue())
+          .add("timestamp", getTimestamp())
+          .add("pane", getPane())
           .toString();
     }
   }
@@ -400,8 +432,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public <NewT> WindowedValue<NewT> withValue(NewT value) {
-      return new TimestampedValueInSingleWindow<>(value, timestamp, window, pane);
+    public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+      return new TimestampedValueInSingleWindow<>(newValue, getTimestamp(), window,
getPane());
     }
 
     @Override
@@ -414,27 +446,31 @@ public abstract class WindowedValue<T> {
       if (o instanceof TimestampedValueInSingleWindow) {
         TimestampedValueInSingleWindow<?> that =
             (TimestampedValueInSingleWindow<?>) o;
-        return Objects.equals(that.value, this.value)
-            && this.timestamp.isEqual(that.timestamp) // don't compare chronology
objects
-            && Objects.equals(that.pane, this.pane)
+        // Compare timestamps first as they are most likely to differ.
+        // Also compare timestamps according to millis-since-epoch because otherwise expensive
+        // comparisons are made on their Chronology objects.
+        return this.getTimestamp().isEqual(that.getTimestamp())
+            && Objects.equals(that.getValue(), this.getValue())
+            && Objects.equals(that.getPane(), this.getPane())
             && Objects.equals(that.window, this.window);
       } else {
-        return false;
+        return super.equals(o);
       }
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(value, timestamp.getMillis(), pane, window);
+      // Hash only the millis of the timestamp to be consistent with equals
+      return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), window);
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
-          .add("value", value)
-          .add("timestamp", timestamp)
+          .add("value", getValue())
+          .add("timestamp", getTimestamp())
           .add("window", window)
-          .add("pane", pane)
+          .add("pane", getPane())
           .toString();
     }
   }
@@ -457,8 +493,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public <NewT> WindowedValue<NewT> withValue(NewT value) {
-      return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane);
+    public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+      return new TimestampedValueInMultipleWindows<>(newValue, getTimestamp(), windows,
getPane());
     }
 
     @Override
@@ -471,30 +507,37 @@ public abstract class WindowedValue<T> {
       if (o instanceof TimestampedValueInMultipleWindows) {
         TimestampedValueInMultipleWindows<?> that =
             (TimestampedValueInMultipleWindows<?>) o;
-        if (this.timestamp.isEqual(that.timestamp) // don't compare chronology objects
-            && Objects.equals(that.value, this.value)
-            && Objects.equals(that.pane, this.pane)) {
+        // Compare timestamps first as they are most likely to differ.
+        // Also compare timestamps according to millis-since-epoch because otherwise expensive
+        // comparisons are made on their Chronology objects.
+        if (this.getTimestamp().isEqual(that.getTimestamp())
+            && Objects.equals(that.getValue(), this.getValue())
+            && Objects.equals(that.getPane(), this.getPane())) {
           ensureWindowsAreASet();
           that.ensureWindowsAreASet();
           return that.windows.equals(this.windows);
+        } else {
+          return false;
         }
+      } else {
+        return super.equals(o);
       }
-      return false;
     }
 
     @Override
     public int hashCode() {
+      // Hash only the millis of the timestamp to be consistent with equals
       ensureWindowsAreASet();
-      return Objects.hash(value, timestamp.getMillis(), pane, windows);
+      return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), windows);
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
-          .add("value", value)
-          .add("timestamp", timestamp)
+          .add("value", getValue())
+          .add("timestamp", getTimestamp())
           .add("windows", windows)
-          .add("pane", pane)
+          .add("pane", getPane())
           .toString();
     }
 


Mime
View raw message