beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/5] beam git commit: Unify Java and Python WindowingStrategy representations.
Date Fri, 02 Jun 2017 16:17:08 GMT
Repository: beam
Updated Branches:
  refs/heads/master 7a075cc34 -> 2f9428c3e


Unify Java and Python WindowingStrategy representations.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de757860
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de757860
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de757860

Branch: refs/heads/master
Commit: de757860945d5966a51173c54d29d0a733e66686
Parents: 7a075cc
Author: Robert Bradshaw <robertwb@google.com>
Authored: Wed May 24 17:23:31 2017 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Fri Jun 2 09:16:42 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |  6 ++
 runners/core-construction-java/pom.xml          |  5 ++
 .../WindowingStrategyTranslation.java           | 60 ++++++++++++++------
 .../src/main/proto/beam_known_payloads.proto    | 53 +++++++++++++++++
 .../runners/dataflow/dataflow_runner.py         | 39 ++++++++++++-
 .../runners/dataflow/dataflow_runner_test.py    | 11 ++++
 sdks/python/apache_beam/transforms/window.py    | 57 ++++++++++++-------
 sdks/python/apache_beam/utils/proto_utils.py    |  6 ++
 sdks/python/apache_beam/utils/urns.py           | 10 ++--
 sdks/python/run_pylint.sh                       |  2 +
 10 files changed, 206 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3e302e7..805a8d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -945,6 +945,12 @@
       </dependency>
 
       <dependency>
+        <groupId>com.google.protobuf</groupId>
+        <artifactId>protobuf-java-util</artifactId>
+        <version>${protobuf.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>com.google.api.grpc</groupId>
         <artifactId>grpc-google-common-protos</artifactId>
         <version>${grpc-google-common-protos.version}</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 7eaa6f3..67951e9 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -70,6 +70,11 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java-util</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index e92565f..a226624 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -17,12 +17,12 @@
  */
 package org.apache.beam.runners.core.construction;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.util.Durations;
+import com.google.protobuf.util.Timestamps;
 import java.io.IOException;
 import java.io.Serializable;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -30,6 +30,11 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApiPayloads;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
@@ -153,9 +158,13 @@ public class WindowingStrategyTranslation implements Serializable {
     }
   }
 
-  // This URN says that the WindowFn is just a UDF blob the indicated SDK understands
+  public static final String GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1";
+  public static final String FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1";
+  public static final String SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1";
+  public static final String SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1";
+  // This URN says that the WindowFn is just a UDF blob the Java SDK understands
   // TODO: standardize such things
-  public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
+  public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1";
 
   /**
    * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
@@ -168,7 +177,7 @@ public class WindowingStrategyTranslation implements Serializable {
         // TODO: Set environment ID
         .setSpec(
             FunctionSpec.newBuilder()
-                .setUrn(CUSTOM_WINDOWFN_URN)
+                .setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
                 .setParameter(
                     Any.pack(
                         BytesValue.newBuilder()
@@ -261,18 +270,37 @@ public class WindowingStrategyTranslation implements Serializable {
 
   public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec)
       throws InvalidProtocolBufferException {
-    checkArgument(
-        windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
-        "Only Java-serialized %s instances are supported, with URN %s. But found URN %s",
-        WindowFn.class.getSimpleName(),
-        CUSTOM_WINDOWFN_URN,
-        windowFnSpec.getSpec().getUrn());
-
-    Object deserializedWindowFn =
-        SerializableUtils.deserializeFromByteArray(
+    switch (windowFnSpec.getSpec().getUrn()) {
+      case GLOBAL_WINDOWS_FN:
+        return new GlobalWindows();
+      case FIXED_WINDOWS_FN:
+        RunnerApiPayloads.FixedWindowsPayload fixedParams =
+            windowFnSpec.getSpec().getParameter().unpack(
+                RunnerApiPayloads.FixedWindowsPayload.class);
+        return FixedWindows.of(
+            Duration.millis(Durations.toMillis(fixedParams.getSize())))
+            .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
+      case SLIDING_WINDOWS_FN:
+        RunnerApiPayloads.SlidingWindowsPayload slidingParams =
+            windowFnSpec.getSpec().getParameter().unpack(
+                RunnerApiPayloads.SlidingWindowsPayload.class);
+        return SlidingWindows.of(
+            Duration.millis(Durations.toMillis(slidingParams.getSize())))
+            .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
+            .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
+      case SESSION_WINDOWS_FN:
+        RunnerApiPayloads.SessionsPayload sessionParams =
+            windowFnSpec.getSpec().getParameter().unpack(
+                RunnerApiPayloads.SessionsPayload.class);
+        return Sessions.withGapDuration(
+            Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
+      case SERIALIZED_JAVA_WINDOWFN_URN:
+        return (WindowFn<?, ?>) SerializableUtils.deserializeFromByteArray(
             windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
             "WindowFn");
-
-    return (WindowFn<?, ?>) deserializedWindowFn;
+      default:
+        throw new IllegalArgumentException(
+            "Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto b/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
new file mode 100644
index 0000000..446bd59
--- /dev/null
+++ b/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Runner API, which is the runner-independent,
+ * SDK-independent definition of the Beam model.
+ */
+
+syntax = "proto3";
+
+package org.apache.beam.runner_api.v1;
+
+option java_package = "org.apache.beam.sdk.common.runner.v1";
+option java_outer_classname = "RunnerApiPayloads";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+// beam:windowfn:global_windows:v0.1
+// empty payload
+
+// beam:windowfn:fixed_windows:v0.1
+message FixedWindowsPayload {
+  google.protobuf.Duration size = 1;
+  google.protobuf.Timestamp offset = 2;
+}
+
+// beam:windowfn:sliding_windows:v0.1
+message SlidingWindowsPayload {
+  google.protobuf.Duration size = 1;
+  google.protobuf.Timestamp offset = 2;
+  google.protobuf.Duration period = 3;
+}
+
+// beam:windowfn:session_windows:v0.1
+message SessionsPayload {
+  google.protobuf.Duration gap_size = 1;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 046d3d5..3e0e268 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -25,6 +25,7 @@ import logging
 import threading
 import time
 import traceback
+import urllib
 
 from apache_beam import error
 from apache_beam import coders
@@ -416,7 +417,9 @@ class DataflowRunner(PipelineRunner):
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
     windowing = transform_node.transform.get_windowing(
         transform_node.inputs)
-    step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(windowing))
+    step.add_property(
+        PropertyNames.SERIALIZED_FN,
+        self.serialize_windowing_strategy(windowing))
 
   def run_ParDo(self, transform_node):
     transform = transform_node.transform
@@ -697,6 +700,40 @@ class DataflowRunner(PipelineRunner):
          PropertyNames.STEP_NAME: input_step.proto.name,
          PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
 
+  @classmethod
+  def serialize_windowing_strategy(cls, windowing):
+    from apache_beam.runners import pipeline_context
+    from apache_beam.runners.api import beam_runner_api_pb2
+    context = pipeline_context.PipelineContext()
+    windowing_proto = windowing.to_runner_api(context)
+    return cls.byte_array_to_json_string(
+        beam_runner_api_pb2.MessageWithComponents(
+            components=context.to_runner_api(),
+            windowing_strategy=windowing_proto).SerializeToString())
+
+  @classmethod
+  def deserialize_windowing_strategy(cls, serialized_data):
+    # Imported here to avoid circular dependencies.
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam.runners import pipeline_context
+    from apache_beam.runners.api import beam_runner_api_pb2
+    from apache_beam.transforms.core import Windowing
+    proto = beam_runner_api_pb2.MessageWithComponents()
+    proto.ParseFromString(cls.json_string_to_byte_array(serialized_data))
+    return Windowing.from_runner_api(
+        proto.windowing_strategy,
+        pipeline_context.PipelineContext(proto.components))
+
+  @staticmethod
+  def byte_array_to_json_string(raw_bytes):
+    """Implements org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString."""
+    return urllib.quote(raw_bytes)
+
+  @staticmethod
+  def json_string_to_byte_array(encoded_string):
+    """Implements org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray."""
+    return urllib.unquote(encoded_string)
+
 
 class DataflowPipelineResult(PipelineResult):
   """Represents the state of a pipeline run on the Dataflow service."""

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index ff4b51d..74fd01d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -38,6 +38,8 @@ from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_a
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.transforms.core import _GroupByKeyOnly
+from apache_beam.transforms.core import Windowing
+from apache_beam.transforms import window
 from apache_beam.typehints import typehints
 
 # Protect against environments where apitools library is not available.
@@ -240,6 +242,15 @@ class DataflowRunnerTest(unittest.TestCase):
     for _ in range(num_inputs):
       self.assertEqual(inputs[0].element_type, output_type)
 
+  def test_serialize_windowing_strategy(self):
+    # This just tests the basic path; more complete tests
+    # are in window_test.py.
+    strategy = Windowing(window.FixedWindows(10))
+    self.assertEqual(
+        strategy,
+        DataflowRunner.deserialize_windowing_strategy(
+            DataflowRunner.serialize_windowing_strategy(strategy)))
+
 
 if __name__ == '__main__':
   unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 94187e0..f74c8a9 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -51,10 +51,12 @@ from __future__ import absolute_import
 
 import abc
 
-from google.protobuf import struct_pb2
+from google.protobuf import duration_pb2
+from google.protobuf import timestamp_pb2
 
 from apache_beam.coders import coders
 from apache_beam.runners.api import beam_runner_api_pb2
+from apache_beam.runners.api import beam_known_payloads_pb2
 from apache_beam.transforms import timeutil
 from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
@@ -341,14 +343,18 @@ class FixedWindows(NonMergingWindowFn):
 
   def to_runner_api_parameter(self, context):
     return (urns.FIXED_WINDOWS_FN,
-            proto_utils.pack_Struct(size=self.size.micros,
-                                    offset=self.offset.micros))
-
-  @urns.RunnerApiFn.register_urn(urns.FIXED_WINDOWS_FN, struct_pb2.Struct)
+            beam_known_payloads_pb2.FixedWindowsPayload(
+                size=proto_utils.from_micros(
+                    duration_pb2.Duration, self.size.micros),
+                offset=proto_utils.from_micros(
+                    timestamp_pb2.Timestamp, self.offset.micros)))
+
+  @urns.RunnerApiFn.register_urn(
+      urns.FIXED_WINDOWS_FN, beam_known_payloads_pb2.FixedWindowsPayload)
   def from_runner_api_parameter(fn_parameter, unused_context):
     return FixedWindows(
-        size=Duration(micros=fn_parameter['size']),
-        offset=Timestamp(micros=fn_parameter['offset']))
+        size=Duration(micros=fn_parameter.size.ToMicroseconds()),
+        offset=Timestamp(micros=fn_parameter.offset.ToMicroseconds()))
 
 
 class SlidingWindows(NonMergingWindowFn):
@@ -392,17 +398,22 @@ class SlidingWindows(NonMergingWindowFn):
 
   def to_runner_api_parameter(self, context):
     return (urns.SLIDING_WINDOWS_FN,
-            proto_utils.pack_Struct(
-                size=self.size.micros,
-                offset=self.offset.micros,
-                period=self.period.micros))
-
-  @urns.RunnerApiFn.register_urn(urns.SLIDING_WINDOWS_FN, struct_pb2.Struct)
+            beam_known_payloads_pb2.SlidingWindowsPayload(
+                size=proto_utils.from_micros(
+                    duration_pb2.Duration, self.size.micros),
+                offset=proto_utils.from_micros(
+                    timestamp_pb2.Timestamp, self.offset.micros),
+                period=proto_utils.from_micros(
+                    duration_pb2.Duration, self.period.micros)))
+
+  @urns.RunnerApiFn.register_urn(
+      urns.SLIDING_WINDOWS_FN,
+      beam_known_payloads_pb2.SlidingWindowsPayload)
   def from_runner_api_parameter(fn_parameter, unused_context):
     return SlidingWindows(
-        size=Duration(micros=fn_parameter['size']),
-        offset=Timestamp(micros=fn_parameter['offset']),
-        period=Duration(micros=fn_parameter['period']))
+        size=Duration(micros=fn_parameter.size.ToMicroseconds()),
+        offset=Timestamp(micros=fn_parameter.offset.ToMicroseconds()),
+        period=Duration(micros=fn_parameter.period.ToMicroseconds()))
 
 
 class Sessions(WindowFn):
@@ -452,10 +463,14 @@ class Sessions(WindowFn):
     if type(self) == type(other) == Sessions:
       return self.gap_size == other.gap_size
 
-  @urns.RunnerApiFn.register_urn(urns.SESSION_WINDOWS_FN, struct_pb2.Struct)
-  def from_runner_api_parameter(fn_parameter, unused_context):
-    return Sessions(gap_size=Duration(micros=fn_parameter['gap_size']))
-
   def to_runner_api_parameter(self, context):
     return (urns.SESSION_WINDOWS_FN,
-            proto_utils.pack_Struct(gap_size=self.gap_size.micros))
+            beam_known_payloads_pb2.SessionsPayload(
+                gap_size=proto_utils.from_micros(
+                    duration_pb2.Duration, self.gap_size.micros)))
+
+  @urns.RunnerApiFn.register_urn(
+      urns.SESSION_WINDOWS_FN, beam_known_payloads_pb2.SessionsPayload)
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return Sessions(
+        gap_size=Duration(micros=fn_parameter.gap_size.ToMicroseconds()))

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/utils/proto_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py
index 090a821..af8f218 100644
--- a/sdks/python/apache_beam/utils/proto_utils.py
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -53,3 +53,9 @@ def pack_Struct(**kwargs):
   for key, value in kwargs.items():
     msg[key] = value  # pylint: disable=unsubscriptable-object, unsupported-assignment-operation
   return msg
+
+
+def from_micros(cls, micros):
+  result = cls()
+  result.FromMicroseconds(micros)
+  return result

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 379b5ff..849b8e3 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -26,11 +26,11 @@ from apache_beam.internal import pickler
 from apache_beam.utils import proto_utils
 
 
-PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1"
-GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1"
-FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
-SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1"
-SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
+PICKLED_WINDOW_FN = "beam:windowfn:pickled_python:v0.1"
+GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1"
+FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1"
+SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1"
+SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"
 
 PICKLED_CODER = "beam:coder:pickled_python:v0.1"
 

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index 400c577..4ef3e7f 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -50,6 +50,8 @@ EXCLUDED_GENERATED_FILES=(
 "apache_beam/runners/api/beam_fn_api_pb2_grpc.py"
 "apache_beam/runners/api/beam_runner_api_pb2.py"
 "apache_beam/runners/api/beam_runner_api_pb2_grpc.py"
+"apache_beam/runners/api/beam_known_payloads_pb2.py"
+"apache_beam/runners/api/beam_known_payloads_pb2_grpc.py"
 )
 
 FILES_TO_IGNORE=""


Mime
View raw message