beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yi...@apache.org
Subject [beam] branch master updated: [BEAM-2914] Add TimestampPrefixingWindowCoder as beam:coder:custom_window:v1. (#15137)
Date Fri, 16 Jul 2021 17:06:30 GMT
This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d98ad2  [BEAM-2914] Add TimestampPrefixingWindowCoder as beam:coder:custom_window:v1.
(#15137)
7d98ad2 is described below

commit 7d98ad2b6554258eeccf9f7c1b017f9a001bfd87
Author: Yichi Zhang <zyichi@google.com>
AuthorDate: Fri Jul 16 10:05:23 2021 -0700

    [BEAM-2914] Add TimestampPrefixingWindowCoder as beam:coder:custom_window:v1. (#15137)
    
    * Add TimestampPrefixingWindowCoder to java and python sdk as beam:coder:custom_window:v1.
---
 .../beam/model/fnexecution/v1/standard_coders.yaml |  23 +++
 .../pipeline/src/main/proto/beam_runner_api.proto  |  25 +++
 .../core/construction/CoderTranslators.java        |  15 ++
 .../core/construction/ModelCoderRegistrar.java     |   3 +
 .../runners/core/construction/ModelCoders.java     |   3 +
 .../core/construction/CoderTranslationTest.java    |   2 +
 .../runners/core/construction/CommonCoderTest.java |   8 +
 .../runners/dataflow/util/CloudObjectKinds.java    |   1 +
 .../dataflow/util/CloudObjectTranslators.java      |  29 ++++
 .../beam/runners/dataflow/util/CloudObjects.java   |   4 +-
 ...DefaultCoderCloudObjectTranslatorRegistrar.java |   1 +
 .../runners/dataflow/util/CloudObjectsTest.java    |   2 +
 .../go/test/regression/coders/fromyaml/fromyaml.go |   1 +
 .../sdk/coders/TimestampPrefixingWindowCoder.java  |  91 ++++++++++
 .../sdk/transforms/windowing/IntervalWindow.java   |  14 ++
 .../coders/TimestampPrefixingWindowCoderTest.java  | 185 +++++++++++++++++++++
 .../transforms/windowing/IntervalWindowTest.java   |  19 +++
 sdks/python/apache_beam/coders/coder_impl.py       |  28 ++++
 sdks/python/apache_beam/coders/coders.py           |  44 +++++
 .../apache_beam/coders/coders_test_common.py       |  13 ++
 .../apache_beam/coders/standard_coders_test.py     |   4 +-
 21 files changed, 513 insertions(+), 2 deletions(-)

diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
index db7021d..ef37772 100644
--- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
+++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
@@ -409,6 +409,7 @@ examples:
   "\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f":
{f_map: {"foo": 9001, "bar": 9223372036854775807}}
   "\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00":
{f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
 
+---
 
 coder:
   urn: "beam:coder:row:v1"
@@ -440,3 +441,25 @@ examples:
     shardId: "",
     key: "key"
   }
+
+---
+
+# Java code snippet to generate example bytes:
+#  TimestampPrefixingWindowCoder<IntervalWindow> coder = TimestampPrefixingWindowCoder.of(IntervalWindowCoder.of());
+#  Instant end = new Instant(-9223372036854410L);
+#  Duration span = Duration.millis(365L);
+#  IntervalWindow window = new IntervalWindow(end.minus(span), span);
+#  byte[] bytes = CoderUtils.encodeToByteArray(coder, window);
+#  String str = new String(bytes, java.nio.charset.StandardCharsets.ISO_8859_1);
+#  String example = "";
+#  for(int i = 0; i < str.length(); i++){
+#    example += CharUtils.unicodeEscaped(str.charAt(i));
+#  }
+#  System.out.println(example);
+coder:
+  urn: "beam:coder:custom_window:v1"
+  components: [{urn: "beam:coder:interval_window:v1"}]
+
+examples:
+  "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0067\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001"
: {window: {end: 1454293425000, span: 3600000}}
+  "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0075\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002"
: {window: {end: -9223372036854410, span: 365}}
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index b5ad193..a367321 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -939,6 +939,31 @@ message StandardCoders {
     // Experimental.
     STATE_BACKED_ITERABLE = 9 [(beam_urn) = "beam:coder:state_backed_iterable:v1"];
 
+
+    // Encodes an arbitrary user defined window and its max timestamp (inclusive).
+    // The encoding format is:
+    //   maxTimestamp window
+    //
+    //   maxTimestamp - A big endian 8 byte integer representing millis-since-epoch.
+    //     The encoded representation is shifted so that the byte representation
+    //     of negative values are lexicographically ordered before the byte
+    //     representation of positive values. This is typically done by
+    //     subtracting -9223372036854775808 from the value and encoding it as a
+    //     signed big endian integer. Example values:
+    //
+    //     -9223372036854775808: 00 00 00 00 00 00 00 00
+    //                     -255: 7F FF FF FF FF FF FF 01
+    //                       -1: 7F FF FF FF FF FF FF FF
+    //                        0: 80 00 00 00 00 00 00 00
+    //                        1: 80 00 00 00 00 00 00 01
+    //                      256: 80 00 00 00 00 00 01 00
+    //      9223372036854775807: FF FF FF FF FF FF FF FF
+    //
+    //   window - the window is encoded using the supplied window coder.
+    //
+    // Components: Coder for the custom window type.
+    CUSTOM_WINDOW = 16 [(beam_urn) = "beam:coder:custom_window:v1"];
+
     // Additional Standard Coders
     // --------------------------
     // The following coders are not required to be implemented for an SDK or
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
index 38763e2..992a9eb 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaTranslation;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -189,6 +190,20 @@ class CoderTranslators {
     };
   }
 
+  static CoderTranslator<TimestampPrefixingWindowCoder<?>> timestampPrefixingWindow()
{
+    return new SimpleStructuredCoderTranslator<TimestampPrefixingWindowCoder<?>>()
{
+      @Override
+      protected TimestampPrefixingWindowCoder<?> fromComponents(List<Coder<?>>
components) {
+        return TimestampPrefixingWindowCoder.of((Coder<? extends BoundedWindow>) components.get(0));
+      }
+
+      @Override
+      public List<? extends Coder<?>> getComponents(TimestampPrefixingWindowCoder<?>
from) {
+        return from.getComponents();
+      }
+    };
+  }
+
   public abstract static class SimpleStructuredCoderTranslator<T extends Coder<?>>
       implements CoderTranslator<T> {
     @Override
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
index 44e2caa..1fc8379 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
@@ -74,6 +75,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
           .put(DoubleCoder.class, ModelCoders.DOUBLE_CODER_URN)
           .put(RowCoder.class, ModelCoders.ROW_CODER_URN)
           .put(ShardedKey.Coder.class, ModelCoders.SHARDED_KEY_CODER_URN)
+          .put(TimestampPrefixingWindowCoder.class, ModelCoders.CUSTOM_WINDOW_CODER_URN)
           .build();
 
   public static final Set<String> WELL_KNOWN_CODER_URNS = BEAM_MODEL_CODER_URNS.values();
@@ -96,6 +98,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
           .put(DoubleCoder.class, CoderTranslators.atomic(DoubleCoder.class))
           .put(RowCoder.class, CoderTranslators.row())
           .put(ShardedKey.Coder.class, CoderTranslators.shardedKey())
+          .put(TimestampPrefixingWindowCoder.class, CoderTranslators.timestampPrefixingWindow())
           .build();
 
   static {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
index 86e1e7d..0ff70f1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
@@ -54,6 +54,8 @@ public class ModelCoders {
   public static final String INTERVAL_WINDOW_CODER_URN =
       getUrn(StandardCoders.Enum.INTERVAL_WINDOW);
 
+  public static final String CUSTOM_WINDOW_CODER_URN = getUrn(StandardCoders.Enum.CUSTOM_WINDOW);
+
   public static final String WINDOWED_VALUE_CODER_URN = getUrn(StandardCoders.Enum.WINDOWED_VALUE);
   public static final String PARAM_WINDOWED_VALUE_CODER_URN =
       getUrn(StandardCoders.Enum.PARAM_WINDOWED_VALUE);
@@ -82,6 +84,7 @@ public class ModelCoders {
           LENGTH_PREFIX_CODER_URN,
           GLOBAL_WINDOW_CODER_URN,
           INTERVAL_WINDOW_CODER_URN,
+          CUSTOM_WINDOW_CODER_URN,
           WINDOWED_VALUE_CODER_URN,
           DOUBLE_CODER_URN,
           ROW_CODER_URN,
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
index 8a1af22..11543f3 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
@@ -95,6 +96,7 @@ public class CoderTranslationTest {
                       Field.of("map", FieldType.map(FieldType.STRING, FieldType.INT32)),
                       Field.of("bar", FieldType.logicalType(FixedBytes.of(123))))))
           .add(ShardedKey.Coder.of(StringUtf8Coder.of()))
+          .add(TimestampPrefixingWindowCoder.of(IntervalWindowCoder.of()))
           .build();
 
   /**
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
index 6c7744f..d7d3665 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
@@ -59,6 +59,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaTranslation;
@@ -116,6 +117,7 @@ public class CommonCoderTest {
               WindowedValue.ParamWindowedValueCoder.class)
           .put(getUrn(StandardCoders.Enum.ROW), RowCoder.class)
           .put(getUrn(StandardCoders.Enum.SHARDED_KEY), ShardedKey.Coder.class)
+          .put(getUrn(StandardCoders.Enum.CUSTOM_WINDOW), TimestampPrefixingWindowCoder.class)
           .build();
 
   @AutoValue
@@ -345,6 +347,10 @@ public class CommonCoderTest {
       byte[] shardId = ((String) kvMap.get("shardId")).getBytes(StandardCharsets.ISO_8859_1);
       return ShardedKey.of(
           convertValue(kvMap.get("key"), coderSpec.getComponents().get(0), keyCoder), shardId);
+    } else if (s.equals(getUrn(StandardCoders.Enum.CUSTOM_WINDOW))) {
+      Map<String, Object> kvMap = (Map<String, Object>) value;
+      Coder windowCoder = ((TimestampPrefixingWindowCoder) coder).getWindowCoder();
+      return convertValue(kvMap.get("window"), coderSpec.getComponents().get(0), windowCoder);
     } else {
       throw new IllegalStateException("Unknown coder URN: " + coderSpec.getUrn());
     }
@@ -502,6 +508,8 @@ public class CommonCoderTest {
       assertEquals(expectedValue, actualValue);
     } else if (s.equals(getUrn(StandardCoders.Enum.SHARDED_KEY))) {
       assertEquals(expectedValue, actualValue);
+    } else if (s.equals(getUrn(StandardCoders.Enum.CUSTOM_WINDOW))) {
+      assertEquals(expectedValue, actualValue);
     } else {
       throw new IllegalStateException("Unknown coder URN: " + coder.getUrn());
     }
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
index c6264f3..0c397f4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.dataflow.util;
 class CloudObjectKinds {
   static final String KIND_GLOBAL_WINDOW = "kind:global_window";
   static final String KIND_INTERVAL_WINDOW = "kind:interval_window";
+  static final String KIND_CUSTOM_WINDOW = "kind:custom_window";
   static final String KIND_LENGTH_PREFIX = "kind:length_prefix";
   static final String KIND_PAIR = "kind:pair";
   static final String KIND_STREAM = "kind:stream";
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
index e57205d..07377fd 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
 import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
@@ -241,6 +242,34 @@ class CloudObjectTranslators {
     };
   }
 
+  static CloudObjectTranslator<TimestampPrefixingWindowCoder> customWindow() {
+    return new CloudObjectTranslator<TimestampPrefixingWindowCoder>() {
+      @Override
+      public CloudObject toCloudObject(
+          TimestampPrefixingWindowCoder target, SdkComponents sdkComponents) {
+        CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_CUSTOM_WINDOW);
+        return addComponents(result, target.getComponents(), sdkComponents);
+      }
+
+      @Override
+      public TimestampPrefixingWindowCoder fromCloudObject(CloudObject cloudObject) {
+        List<Coder<?>> components = getComponents(cloudObject);
+        checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
+        return TimestampPrefixingWindowCoder.of((Coder<? extends BoundedWindow>) components.get(0));
+      }
+
+      @Override
+      public Class<? extends TimestampPrefixingWindowCoder> getSupportedClass() {
+        return TimestampPrefixingWindowCoder.class;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObjectKinds.KIND_CUSTOM_WINDOW;
+      }
+    };
+  }
+
   /**
    * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is
of kind
    * "windowed_value".
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index 766a32d..56c4d57 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
@@ -58,7 +59,8 @@ public class CloudObjects {
           Timer.Coder.class,
           LengthPrefixCoder.class,
           GlobalWindow.Coder.class,
-          FullWindowedValueCoder.class);
+          FullWindowedValueCoder.class,
+          TimestampPrefixingWindowCoder.class);
 
   static final Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
       CODER_TRANSLATORS = populateCoderTranslators();
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index 01101c6..92070ef 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -65,6 +65,7 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
       ImmutableList.of(
           CloudObjectTranslators.globalWindow(),
           CloudObjectTranslators.intervalWindow(),
+          CloudObjectTranslators.customWindow(),
           CloudObjectTranslators.bytes(),
           CloudObjectTranslators.varInt(),
           CloudObjectTranslators.lengthPrefix(),
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index eb7e6cd..f1a004f 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -144,6 +145,7 @@ public class CloudObjectsTest {
               .add(new ObjectCoder())
               .add(GlobalWindow.Coder.INSTANCE)
               .add(IntervalWindow.getCoder())
+              .add(TimestampPrefixingWindowCoder.of(IntervalWindow.getCoder()))
               .add(LengthPrefixCoder.of(VarLongCoder.of()))
               .add(IterableCoder.of(VarLongCoder.of()))
               .add(KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()))
diff --git a/sdks/go/test/regression/coders/fromyaml/fromyaml.go b/sdks/go/test/regression/coders/fromyaml/fromyaml.go
index 3544a5f..8ba9e99 100644
--- a/sdks/go/test/regression/coders/fromyaml/fromyaml.go
+++ b/sdks/go/test/regression/coders/fromyaml/fromyaml.go
@@ -45,6 +45,7 @@ var unimplementedCoders = map[string]bool{
 	"beam:coder:param_windowed_value:v1": true,
 	"beam:coder:timer:v1":                true,
 	"beam:coder:sharded_key:v1":          true,
+	"beam:coder:custom_window:v1":        true,
 }
 
 // Coder is a representation a serialized beam coder.
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TimestampPrefixingWindowCoder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TimestampPrefixingWindowCoder.java
new file mode 100644
index 0000000..ea7d56e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TimestampPrefixingWindowCoder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+/**
+ * A {@link TimestampPrefixingWindowCoder} wraps arbitrary user custom window coder. While
encoding
+ * the custom window type, it extracts the maxTimestamp(inclusive) of the window and prefix
it to
+ * the encoded bytes of the window using the user custom window coder.
+ *
+ * @param <T> The custom window type.
+ */
+public class TimestampPrefixingWindowCoder<T extends BoundedWindow> extends StructuredCoder<T>
{
+  private final Coder<T> windowCoder;
+
+  public static <T extends BoundedWindow> TimestampPrefixingWindowCoder<T> of(
+      Coder<T> windowCoder) {
+    return new TimestampPrefixingWindowCoder<>(windowCoder);
+  }
+
+  TimestampPrefixingWindowCoder(Coder<T> windowCoder) {
+    this.windowCoder = windowCoder;
+  }
+
+  public Coder<T> getWindowCoder() {
+    return windowCoder;
+  }
+
+  @Override
+  public void encode(T value, OutputStream outStream) throws CoderException, IOException
{
+    if (value == null) {
+      throw new CoderException("Cannot encode null window");
+    }
+    InstantCoder.of().encode(value.maxTimestamp(), outStream);
+    windowCoder.encode(value, outStream);
+  }
+
+  @Override
+  public T decode(InputStream inStream) throws CoderException, IOException {
+    InstantCoder.of().decode(inStream);
+    return windowCoder.decode(inStream);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return Lists.newArrayList(windowCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    windowCoder.verifyDeterministic();
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return windowCoder.consistentWithEquals();
+  }
+
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(T value) {
+    return windowCoder.isRegisterByteSizeObserverCheap(value);
+  }
+
+  @Override
+  public void registerByteSizeObserver(T value, ElementByteSizeObserver observer) throws
Exception {
+    InstantCoder.of().registerByteSizeObserver(value.maxTimestamp(), observer);
+    windowCoder.registerByteSizeObserver(value, observer);
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index 87ffa4a..143901a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.DurationCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -175,6 +176,19 @@ public class IntervalWindow extends BoundedWindow implements Comparable<Interval
     }
 
     @Override
+    public boolean isRegisterByteSizeObserverCheap(IntervalWindow value) {
+      return instantCoder.isRegisterByteSizeObserverCheap(value.end)
+          && durationCoder.isRegisterByteSizeObserverCheap(new Duration(value.start,
value.end));
+    }
+
+    @Override
+    public void registerByteSizeObserver(IntervalWindow value, ElementByteSizeObserver observer)
+        throws Exception {
+      instantCoder.registerByteSizeObserver(value.end, observer);
+      durationCoder.registerByteSizeObserver(new Duration(value.start, value.end), observer);
+    }
+
+    @Override
     public List<? extends Coder<?>> getCoderArguments() {
       return Collections.emptyList();
     }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/TimestampPrefixingWindowCoderTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/TimestampPrefixingWindowCoderTest.java
new file mode 100644
index 0000000..a9f8123
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/TimestampPrefixingWindowCoderTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.CoderProperties.TestElementByteSizeObserver;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+public class TimestampPrefixingWindowCoderTest {
+
+  private static class CustomWindow extends IntervalWindow {
+    private boolean isBig;
+
+    CustomWindow(Instant start, Instant end, boolean isBig) {
+      super(start, end);
+      this.isBig = isBig;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CustomWindow that = (CustomWindow) o;
+      return super.equals(o) && this.isBig == that.isBig;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), isBig);
+    }
+  }
+
+  private static class CustomWindowCoder extends CustomCoder<CustomWindow> {
+
+    private static final Coder<IntervalWindow> INTERVAL_WINDOW_CODER = IntervalWindow.getCoder();
+    private static final int REGISTER_BYTE_SIZE = 1234;
+    private final boolean isConsistentWithEqual;
+    private final boolean isRegisterByteSizeCheap;
+
+    public static CustomWindowCoder of(
+        boolean isConsistentWithEqual, boolean isRegisterByteSizeCheap) {
+      return new CustomWindowCoder(isConsistentWithEqual, isRegisterByteSizeCheap);
+    }
+
+    private CustomWindowCoder(boolean isConsistentWithEqual, boolean isRegisterByteSizeCheap)
{
+      this.isConsistentWithEqual = isConsistentWithEqual;
+      this.isRegisterByteSizeCheap = isRegisterByteSizeCheap;
+    }
+
+    @Override
+    public void encode(CustomWindow window, OutputStream outStream) throws IOException {
+      INTERVAL_WINDOW_CODER.encode(window, outStream);
+      BooleanCoder.of().encode(window.isBig, outStream);
+    }
+
+    @Override
+    public CustomWindow decode(InputStream inStream) throws IOException {
+      IntervalWindow superWindow = INTERVAL_WINDOW_CODER.decode(inStream);
+      boolean isBig = BooleanCoder.of().decode(inStream);
+      return new CustomWindow(superWindow.start(), superWindow.end(), isBig);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      INTERVAL_WINDOW_CODER.verifyDeterministic();
+      BooleanCoder.of().verifyDeterministic();
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return isConsistentWithEqual;
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(CustomWindow value) {
+      return isRegisterByteSizeCheap;
+    }
+
+    @Override
+    public void registerByteSizeObserver(CustomWindow value, ElementByteSizeObserver observer)
+        throws Exception {
+      observer.update(REGISTER_BYTE_SIZE);
+    }
+  }
+
+  private static final List<CustomWindow> CUSTOM_WINDOW_LIST =
+      Lists.newArrayList(
+          new CustomWindow(new Instant(0L), new Instant(1L), true),
+          new CustomWindow(new Instant(100L), new Instant(200L), false),
+          new CustomWindow(new Instant(0L), BoundedWindow.TIMESTAMP_MAX_VALUE, true));
+
+  @Test
+  public void testEncodeAndDecode() throws Exception {
+    List<IntervalWindow> intervalWindowsToTest =
+        Lists.newArrayList(
+            new IntervalWindow(new Instant(0L), new Instant(1L)),
+            new IntervalWindow(new Instant(100L), new Instant(200L)),
+            new IntervalWindow(new Instant(0L), BoundedWindow.TIMESTAMP_MAX_VALUE));
+    TimestampPrefixingWindowCoder<IntervalWindow> coder1 =
+        TimestampPrefixingWindowCoder.of(IntervalWindow.getCoder());
+    for (IntervalWindow window : intervalWindowsToTest) {
+      CoderProperties.coderDecodeEncodeEqual(coder1, window);
+    }
+
+    GlobalWindow globalWindow = GlobalWindow.INSTANCE;
+    TimestampPrefixingWindowCoder<GlobalWindow> coder2 =
+        TimestampPrefixingWindowCoder.of(GlobalWindow.Coder.INSTANCE);
+    CoderProperties.coderDecodeEncodeEqual(coder2, globalWindow);
+    TimestampPrefixingWindowCoder<CustomWindow> coder3 =
+        TimestampPrefixingWindowCoder.of(CustomWindowCoder.of(true, true));
+    for (CustomWindow window : CUSTOM_WINDOW_LIST) {
+      CoderProperties.coderDecodeEncodeEqual(coder3, window);
+    }
+  }
+
+  @Test
+  public void testConsistentWithEquals() {
+    TimestampPrefixingWindowCoder<CustomWindow> coder1 =
+        TimestampPrefixingWindowCoder.of(CustomWindowCoder.of(true, true));
+    assertThat(coder1.consistentWithEquals(), equalTo(true));
+    TimestampPrefixingWindowCoder<CustomWindow> coder2 =
+        TimestampPrefixingWindowCoder.of(CustomWindowCoder.of(false, true));
+    assertThat(coder2.consistentWithEquals(), equalTo(false));
+  }
+
+  @Test
+  public void testIsRegisterByteSizeObserverCheap() {
+    TimestampPrefixingWindowCoder<CustomWindow> coder1 =
+        TimestampPrefixingWindowCoder.of(CustomWindowCoder.of(true, true));
+    assertThat(coder1.isRegisterByteSizeObserverCheap(CUSTOM_WINDOW_LIST.get(0)), equalTo(true));
+    TimestampPrefixingWindowCoder<CustomWindow> coder2 =
+        TimestampPrefixingWindowCoder.of(CustomWindowCoder.of(true, false));
+    assertThat(coder2.isRegisterByteSizeObserverCheap(CUSTOM_WINDOW_LIST.get(0)), equalTo(false));
+  }
+
+  @Test
+  public void testGetEncodedElementByteSize() throws Exception {
+    TestElementByteSizeObserver observer = new TestElementByteSizeObserver();
+    TimestampPrefixingWindowCoder<CustomWindow> coder =
+        TimestampPrefixingWindowCoder.of(CustomWindowCoder.of(true, true));
+    for (CustomWindow value : CUSTOM_WINDOW_LIST) {
+      coder.registerByteSizeObserver(value, observer);
+      observer.advance();
+      assertThat(
+          observer.getSumAndReset(),
+          equalTo(
+              CustomWindowCoder.REGISTER_BYTE_SIZE
+                  + InstantCoder.of().getEncodedElementByteSize(value.maxTimestamp())));
+    }
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/IntervalWindowTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/IntervalWindowTest.java
index 36daef0..a79ff6f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/IntervalWindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/IntervalWindowTest.java
@@ -22,10 +22,13 @@ import static org.hamcrest.Matchers.equalTo;
 
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DurationCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.CoderProperties.TestElementByteSizeObserver;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -87,4 +90,20 @@ public class IntervalWindowTest {
     assertThat(encodedHourWindow.length, equalTo(encodedStart.length + encodedHourEnd.length
- 4));
     assertThat(encodedDayWindow.length, equalTo(encodedStart.length + encodedDayEnd.length
- 4));
   }
+
+  @Test
+  public void testCoderRegisterByteSizeObserver() throws Exception {
+    assertThat(TEST_CODER.isRegisterByteSizeObserverCheap(TEST_VALUES.get(0)), equalTo(true));
+    TestElementByteSizeObserver observer = new TestElementByteSizeObserver();
+    TestElementByteSizeObserver observer2 = new TestElementByteSizeObserver();
+    for (IntervalWindow window : TEST_VALUES) {
+      TEST_CODER.registerByteSizeObserver(window, observer);
+      InstantCoder.of().registerByteSizeObserver(window.maxTimestamp(), observer2);
+      DurationCoder.of()
+          .registerByteSizeObserver(new Duration(window.start(), window.end()), observer2);
+      observer.advance();
+      observer2.advance();
+      assertThat(observer.getSumAndReset(), equalTo(observer2.getSumAndReset()));
+    }
+  }
 }
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 0cf0208..618ee55 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -1483,3 +1483,31 @@ class ShardedKeyCoderImpl(StreamCoderImpl):
     estimated_size += (
         self._key_coder_impl.estimate_size(value.key, nested=True))
     return estimated_size
+
+
+class TimestampPrefixingWindowCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for custom window types, which prefix required max_timestamp to
+  encoded original window.
+
+  The coder encodes and decodes custom window types with following format:
+    window's max_timestamp()
+    encoded window using it's own coder.
+  """
+  def __init__(self, window_coder_impl: CoderImpl) -> None:
+    self._window_coder_impl = window_coder_impl
+
+  def encode_to_stream(self, value, stream, nested):
+    TimestampCoderImpl().encode_to_stream(value.max_timestamp(), stream, nested)
+    self._window_coder_impl.encode_to_stream(value, stream, nested)
+
+  def decode_from_stream(self, stream, nested):
+    TimestampCoderImpl().decode_from_stream(stream, nested)
+    return self._window_coder_impl.decode_from_stream(stream, nested)
+
+  def estimate_size(self, value: Any, nested: bool = False) -> int:
+    estimated_size = 0
+    estimated_size += TimestampCoderImpl().estimate_size(value)
+    estimated_size += self._window_coder_impl.estimate_size(value, nested)
+    return estimated_size
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 163fd17..2d2b336 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -1509,3 +1509,47 @@ class ShardedKeyCoder(FastCoder):
 
 Coder.register_structured_urn(
     common_urns.coders.SHARDED_KEY.urn, ShardedKeyCoder)
+
+
+class TimestampPrefixingWindowCoder(FastCoder):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Coder which prefixes the max timestamp of arbitrary window to its encoded
+  form."""
+  def __init__(self, window_coder: Coder) -> None:
+    self._window_coder = window_coder
+
+  def _create_impl(self):
+    return coder_impl.TimestampPrefixingWindowCoderImpl(
+        self._window_coder.get_impl())
+
+  def to_type_hint(self):
+    return self._window_coder.to_type_hint()
+
+  def _get_component_coders(self) -> List[Coder]:
+    return [self._window_coder]
+
+  def is_deterministic(self) -> bool:
+    return self._window_coder.is_deterministic()
+
+  def as_cloud_object(self, coders_context=None):
+    return {
+        '@type': 'kind:custom_window',
+        'component_encodings': [
+            self._window_coder.as_cloud_object(coders_context)
+        ],
+    }
+
+  def __repr__(self):
+    return 'TimestampPrefixingWindowCoder[%r]' % self._window_coder
+
+  def __eq__(self, other):
+    return (
+        type(self) == type(other) and self._window_coder == other._window_coder)
+
+  def __hash__(self):
+    return hash((type(self), self._window_coder))
+
+
+Coder.register_structured_urn(
+    common_urns.coders.CUSTOM_WINDOW.urn, TimestampPrefixingWindowCoder)
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 98e0547..11334ed 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -749,6 +749,19 @@ class CodersTest(unittest.TestCase):
             coders.TupleCoder((coder, other_coder)),
             (ShardedKey(key, b'123'), ShardedKey(other_key, b'')))
 
+  def test_timestamp_prefixing_window_coder(self):
+    self.check_coder(
+        coders.TimestampPrefixingWindowCoder(coders.IntervalWindowCoder()),
+        *[
+            window.IntervalWindow(x, y) for x in [-2**52, 0, 2**52]
+            for y in range(-100, 100)
+        ])
+    self.check_coder(
+        coders.TupleCoder((
+            coders.TimestampPrefixingWindowCoder(
+                coders.IntervalWindowCoder()), )),
+        (window.IntervalWindow(0, 10), ))
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 1a74dbe..454939f 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -189,7 +189,9 @@ class StandardCodersTest(unittest.TestCase):
       'beam:coder:double:v1': parse_float,
       'beam:coder:sharded_key:v1': lambda x,
       value_parser: ShardedKey(
-          key=value_parser(x['key']), shard_id=x['shardId'].encode('utf-8'))
+          key=value_parser(x['key']), shard_id=x['shardId'].encode('utf-8')),
+      'beam:coder:custom_window:v1': lambda x,
+      window_parser: window_parser(x['window'])
   }
 
   def test_standard_coders(self):

Mime
View raw message