beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/2] beam git commit: Add ParDos
Date Thu, 18 May 2017 22:41:31 GMT
Add ParDos

Add ParDoPayloadTranslator to PTransformTranslator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/790e7fe6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/790e7fe6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/790e7fe6

Branch: refs/heads/master
Commit: 790e7fe6653b926044d3dfecdccbc2fda9c998f0
Parents: 6a7eeeb
Author: Thomas Groh <tgroh@google.com>
Authored: Tue Mar 21 15:06:58 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Thu May 18 15:41:17 2017 -0700

----------------------------------------------------------------------
 .../runners/core/construction/PTransforms.java  |   8 +-
 .../beam/runners/core/construction/ParDos.java  | 317 +++++++++++++++++++
 .../construction/RunnerPCollectionView.java     |  88 +++++
 .../runners/core/construction/ParDosTest.java   | 229 ++++++++++++++
 .../src/main/proto/beam_runner_api.proto        |  10 +
 .../sdk/transforms/windowing/GlobalWindows.java |   6 +-
 6 files changed, 653 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
index d25d342..16276b9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
@@ -24,10 +24,12 @@ import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.construction.ParDos.ParDoPayloadTranslator;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -39,9 +41,9 @@ import org.apache.beam.sdk.values.TupleTag;
 public class PTransforms {
   private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
       KNOWN_PAYLOAD_TRANSLATORS =
-          ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder().build();
-  // TODO: ParDoPayload, WindowIntoPayload, ReadPayload, CombinePayload
-  // TODO: "Flatten Payload", etc?
+          ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder()
+              .put(ParDo.MultiOutput.class, ParDoPayloadTranslator.create())
+              .build();
   // TODO: Load via service loader.
   private PTransforms() {}
 

http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
new file mode 100644
index 0000000..b2b29df
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Optional;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Parameter.Type;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput.Builder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.StateSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.TimerSpec;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos.
+ */
+public class ParDos {
+  /**
+   * The URN for a {@link ParDoPayload}.
+   */
+  public static final String PAR_DO_PAYLOAD_URN = "urn:beam:pardo:v1";
+  /**
+   * The URN for an unknown Java {@link DoFn}.
+   */
+  public static final String CUSTOM_JAVA_DO_FN_URN = "urn:beam:dofn:javasdk:0.1";
+  /**
+   * The URN for an unknown Java {@link ViewFn}.
+   */
+  public static final String CUSTOM_JAVA_VIEW_FN_URN = "urn:beam:viewfn:javasdk:0.1";
+  /**
+   * The URN for an unknown Java {@link WindowMappingFn}.
+   */
+  public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN =
+      "urn:beam:windowmappingfn:javasdk:0.1";
+
+  /**
+   * A {@link TransformPayloadTranslator} for {@link ParDo}.
+   */
+  public static class ParDoPayloadTranslator
+      implements PTransforms.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>>
{
+    public static TransformPayloadTranslator create() {
+      return new ParDoPayloadTranslator();
+    }
+
+    private ParDoPayloadTranslator() {}
+
+    @Override
+    public FunctionSpec translate(
+        AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents components)
{
+      ParDoPayload payload = toProto(transform.getTransform(), components);
+      return RunnerApi.FunctionSpec.newBuilder()
+          .setUrn(PAR_DO_PAYLOAD_URN)
+          .setParameter(Any.pack(payload))
+          .build();
+    }
+  }
+
+  public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components)
{
+    DoFnSignature signature = DoFnSignatures.getSignature(parDo.getFn().getClass());
+    Map<String, StateDeclaration> states = signature.stateDeclarations();
+    Map<String, TimerDeclaration> timers = signature.timerDeclarations();
+    List<Parameter> parameters = signature.processElement().extraParameters();
+
+    ParDoPayload.Builder builder = ParDoPayload.newBuilder();
+    builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag()));
+    for (PCollectionView<?> sideInput : parDo.getSideInputs()) {
+      builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput));
+    }
+    for (Parameter parameter : parameters) {
+      Optional<RunnerApi.Parameter> protoParameter = toProto(parameter);
+      if (protoParameter.isPresent()) {
+        builder.addParameters(protoParameter.get());
+      }
+    }
+    for (Map.Entry<String, StateDeclaration> state : states.entrySet()) {
+      StateSpec spec = toProto(state.getValue());
+      builder.putStateSpecs(state.getKey(), spec);
+    }
+    for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) {
+      TimerSpec spec = toProto(timer.getValue());
+      builder.putTimerSpecs(timer.getKey(), spec);
+    }
+    return builder.build();
+  }
+
+  public static DoFn<?, ?> getDoFn(ParDoPayload payload) throws InvalidProtocolBufferException
{
+    return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn();
+  }
+
+  public static TupleTag<?> getMainOutputTag(ParDoPayload payload)
+      throws InvalidProtocolBufferException {
+    return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
+  }
+
+  // TODO: Implement
+  private static StateSpec toProto(StateDeclaration state) {
+    throw new UnsupportedOperationException("Not yet supported");
+  }
+
+  // TODO: Implement
+  private static TimerSpec toProto(TimerDeclaration timer) {
+    throw new UnsupportedOperationException("Not yet supported");
+  }
+
+  @AutoValue
+  abstract static class DoFnAndMainOutput implements Serializable {
+    public static DoFnAndMainOutput of(
+        DoFn<?, ?> fn, TupleTag<?> tag) {
+      return new AutoValue_ParDos_DoFnAndMainOutput(fn, tag);
+    }
+
+    abstract DoFn<?, ?> getDoFn();
+    abstract TupleTag<?> getMainOutputTag();
+  }
+
+  private static SdkFunctionSpec toProto(DoFn<?, ?> fn, TupleTag<?> tag) {
+    return SdkFunctionSpec.newBuilder()
+        .setSpec(
+            FunctionSpec.newBuilder()
+                .setUrn(CUSTOM_JAVA_DO_FN_URN)
+                .setParameter(
+                    Any.pack(
+                        BytesValue.newBuilder()
+                            .setValue(
+                                ByteString.copyFrom(
+                                    SerializableUtils.serializeToByteArray(
+                                        DoFnAndMainOutput.of(fn, tag))))
+                            .build())))
+        .build();
+  }
+
+  private static DoFnAndMainOutput doFnAndMainOutputTagFromProto(SdkFunctionSpec fnSpec)
+      throws InvalidProtocolBufferException {
+    checkArgument(fnSpec.getSpec().getUrn().equals(CUSTOM_JAVA_DO_FN_URN));
+    byte[] serializedFn =
+        fnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
+    return (DoFnAndMainOutput)
+        SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output
tag");
+  }
+
+  private static Optional<RunnerApi.Parameter> toProto(Parameter parameter) {
+    return parameter.match(
+        new Cases.WithDefault<Optional<RunnerApi.Parameter>>() {
+          @Override
+          public Optional<RunnerApi.Parameter> dispatch(WindowParameter p) {
+            return Optional.of(RunnerApi.Parameter.newBuilder().setType(Type.WINDOW).build());
+          }
+
+          @Override
+          public Optional<RunnerApi.Parameter> dispatch(RestrictionTrackerParameter
p) {
+            return Optional.of(
+                RunnerApi.Parameter.newBuilder().setType(Type.RESTRICTION_TRACKER).build());
+          }
+
+          @Override
+          protected Optional<RunnerApi.Parameter> dispatchDefault(Parameter p) {
+            return Optional.absent();
+          }
+        });
+  }
+
+  private static SideInput toProto(PCollectionView<?> view) {
+    Builder builder = SideInput.newBuilder();
+    builder.setAccessPattern(
+        FunctionSpec.newBuilder()
+            .setUrn(view.getViewFn().getMaterialization().getUrn())
+            .build());
+    builder.setViewFn(toProto(view.getViewFn()));
+    builder.setWindowMappingFn(toProto(view.getWindowMappingFn()));
+    return builder.build();
+  }
+
+  public static PCollectionView<?> fromProto(
+      SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, Components components)
+      throws IOException {
+    TupleTag<?> tag = new TupleTag<>(id);
+    WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn());
+    ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
+
+    RunnerApi.PCollection inputCollection =
+        components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id));
+    WindowingStrategy<?, ?> windowingStrategy =
+        WindowingStrategies.fromProto(
+            components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()),
+            components);
+    Coder<?> elemCoder =
+        Coders.fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), components);
+    Coder<Iterable<WindowedValue<?>>> coder =
+        (Coder)
+            IterableCoder.of(
+                FullWindowedValueCoder.of(
+                    elemCoder, windowingStrategy.getWindowFn().windowCoder()));
+    checkArgument(
+        sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN),
+        "Unknown View Materialization URN %s",
+        sideInput.getAccessPattern().getUrn());
+
+    PCollectionView<?> view =
+        new RunnerPCollectionView<>(
+            (TupleTag<Iterable<WindowedValue<?>>>) tag,
+            (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn,
+            windowMappingFn,
+            windowingStrategy,
+            coder);
+    return view;
+  }
+
+  private static SdkFunctionSpec toProto(ViewFn<?, ?> viewFn) {
+    return SdkFunctionSpec.newBuilder()
+        .setSpec(
+            FunctionSpec.newBuilder()
+                .setUrn(CUSTOM_JAVA_VIEW_FN_URN)
+                .setParameter(
+                    Any.pack(
+                        BytesValue.newBuilder()
+                            .setValue(
+                                ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn)))
+                            .build())))
+        .build();
+  }
+
+  private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn)
+      throws InvalidProtocolBufferException {
+    FunctionSpec spec = viewFn.getSpec();
+    checkArgument(
+        spec.getUrn().equals(CUSTOM_JAVA_VIEW_FN_URN),
+        "Can't deserialize unknown %s type %s",
+        ViewFn.class.getSimpleName(),
+        spec.getUrn());
+    return (ViewFn<?, ?>)
+        SerializableUtils.deserializeFromByteArray(
+            spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), "Custom
ViewFn");
+  }
+
+  private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn) {
+    return SdkFunctionSpec.newBuilder()
+        .setSpec(
+            FunctionSpec.newBuilder()
+                .setUrn(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN)
+                .setParameter(
+                    Any.pack(
+                        BytesValue.newBuilder()
+                            .setValue(
+                                ByteString.copyFrom(
+                                    SerializableUtils.serializeToByteArray(windowMappingFn)))
+                            .build())))
+        .build();
+  }
+
+  private static WindowMappingFn<?> windowMappingFnFromProto(SdkFunctionSpec windowMappingFn)
+      throws InvalidProtocolBufferException {
+    FunctionSpec spec = windowMappingFn.getSpec();
+    checkArgument(
+        spec.getUrn().equals(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN),
+        "Can't deserialize unknown %s type %s",
+        WindowMappingFn.class.getSimpleName(),
+        spec.getUrn());
+    return (WindowMappingFn<?>)
+        SerializableUtils.deserializeFromByteArray(
+            spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+            "Custom WinodwMappingFn");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
new file mode 100644
index 0000000..89e8784
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValueBase;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/** A {@link PCollectionView} created from the components of a {@link SideInput}. */
+class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T>
{
+  private final TupleTag<Iterable<WindowedValue<?>>> tag;
+  private final ViewFn<Iterable<WindowedValue<?>>, T> viewFn;
+  private final WindowMappingFn<?> windowMappingFn;
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  private final Coder<Iterable<WindowedValue<?>>> coder;
+
+  /**
+   * Create a new {@link RunnerPCollectionView} from the provided components.
+   */
+  RunnerPCollectionView(
+      TupleTag<Iterable<WindowedValue<?>>> tag,
+      ViewFn<Iterable<WindowedValue<?>>, T> viewFn,
+      WindowMappingFn<?> windowMappingFn,
+      @Nullable WindowingStrategy<?, ?> windowingStrategy,
+      @Nullable Coder<Iterable<WindowedValue<?>>> coder) {
+    this.tag = tag;
+    this.viewFn = viewFn;
+    this.windowMappingFn = windowMappingFn;
+    this.windowingStrategy = windowingStrategy;
+    this.coder = coder;
+  }
+
+  @Nullable
+  @Override
+  public PCollection<?> getPCollection() {
+    throw new IllegalStateException(
+        String.format("Cannot call getPCollection on a %s", getClass().getSimpleName()));
+  }
+
+  @Override
+  public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
+    return tag;
+  }
+
+  @Override
+  public ViewFn<Iterable<WindowedValue<?>>, T> getViewFn() {
+    return viewFn;
+  }
+
+  @Override
+  public WindowMappingFn<?> getWindowMappingFn() {
+    return windowMappingFn;
+  }
+
+  @Override
+  public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
+    return windowingStrategy;
+  }
+
+  @Override
+  public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
+    return coder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
new file mode 100644
index 0000000..74edec1
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine.BinaryCombineLongFn;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for {@link ParDos}. */
+@RunWith(Parameterized.class)
+public class ParDosTest {
+  public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  private static PCollectionView<Long> singletonSideInput =
+      p.apply("GenerateSingleton", GenerateSequence.from(0L).to(1L))
+          .apply(View.<Long>asSingleton());
+  private static PCollectionView<Map<Long, Iterable<String>>> multimapSideInput
=
+      p.apply("CreateMultimap", Create.of(KV.of(1L, "foo"), KV.of(1L, "bar"), KV.of(2L, "spam")))
+          .setCoder(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of()))
+          .apply(View.<Long, String>asMultimap());
+
+  private static PCollection<KV<Long, String>> mainInput =
+      p.apply("CreateMainInput", Create.empty(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of())));
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<ParDo.MultiOutput<?, ?>> data() {
+    return ImmutableList.<ParDo.MultiOutput<?, ?>>of(
+        ParDo.of(new DropElementsFn()).withOutputTags(new TupleTag<Void>(), TupleTagList.empty()),
+        ParDo.of(new DropElementsFn())
+            .withOutputTags(new TupleTag<Void>(), TupleTagList.empty())
+            .withSideInputs(singletonSideInput, multimapSideInput),
+        ParDo.of(new DropElementsFn())
+            .withOutputTags(
+                new TupleTag<Void>(),
+                TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>()
{}))
+            .withSideInputs(singletonSideInput, multimapSideInput),
+        ParDo.of(new DropElementsFn())
+            .withOutputTags(
+                new TupleTag<Void>(),
+                TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>()
{})));
+  }
+
+  @Parameter(0)
+  public ParDo.MultiOutput<KV<Long, String>, Void> parDo;
+
+  @Test
+  public void testToAndFromProto() throws Exception {
+    SdkComponents components = SdkComponents.create();
+    ParDoPayload payload = ParDos.toProto(parDo, components);
+
+    assertThat(ParDos.getDoFn(payload), Matchers.<DoFn<?, ?>>equalTo(parDo.getFn()));
+    assertThat(
+        ParDos.getMainOutputTag(payload), Matchers.<TupleTag<?>>equalTo(parDo.getMainOutputTag()));
+    for (PCollectionView<?> view : parDo.getSideInputs()) {
+      payload.getSideInputsOrThrow(view.getTagInternal().getId());
+    }
+  }
+
+  @Test
+  public void toAndFromTransformProto() throws Exception {
+    Map<TupleTag<?>, PValue> inputs = new HashMap<>();
+    inputs.put(new TupleTag<KV<Long, String>>() {}, mainInput);
+    inputs.putAll(parDo.getAdditionalInputs());
+    PCollectionTuple output = mainInput.apply(parDo);
+
+    SdkComponents components = SdkComponents.create();
+    String transformId =
+        components.registerPTransform(
+            AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>,
MultiOutput>of(
+                "foo", inputs, output.expand(), parDo, p),
+            Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+
+    Components protoComponents = components.toComponents();
+    RunnerApi.PTransform protoTransform =
+        protoComponents.getTransformsOrThrow(transformId);
+    ParDoPayload parDoPayload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+    for (PCollectionView<?> view : parDo.getSideInputs()) {
+      SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
+      PCollectionView<?> restoredView =
+          ParDos.fromProto(
+              sideInput, view.getTagInternal().getId(), protoTransform, protoComponents);
+      assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
+      assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
+      assertThat(
+          restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass()));
+      assertThat(
+          restoredView.getWindowingStrategyInternal(),
+          Matchers.<WindowingStrategy<?, ?>>equalTo(
+              view.getWindowingStrategyInternal().fixDefaults()));
+      assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
+    }
+  }
+
+  private static class DropElementsFn extends DoFn<KV<Long, String>, Void> {
+    @ProcessElement
+    public void proc(ProcessContext context, BoundedWindow window) {
+      context.output(null);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof DropElementsFn;
+    }
+
+    @Override
+    public int hashCode() {
+      return DropElementsFn.class.hashCode();
+    }
+  }
+
+  @SuppressWarnings("unused")
+  private static class StateTimerDropElementsFn extends DoFn<KV<Long, String>, Void>
{
+    private static final String BAG_STATE_ID = "bagState";
+    private static final String COMBINING_STATE_ID = "combiningState";
+    private static final String EVENT_TIMER_ID = "eventTimer";
+    private static final String PROCESSING_TIMER_ID = "processingTimer";
+
+    @StateId(BAG_STATE_ID)
+    private final StateSpec<BagState<String>> bagState = StateSpecs.bag(StringUtf8Coder.of());
+
+    @StateId(COMBINING_STATE_ID)
+    private final StateSpec<CombiningState<Long, long[], Long>> combiningState
=
+        StateSpecs.combining(
+            new BinaryCombineLongFn() {
+              @Override
+              public long apply(long left, long right) {
+                return Math.max(left, right);
+              }
+
+              @Override
+              public long identity() {
+                return Long.MIN_VALUE;
+              }
+            });
+
+    @TimerId(EVENT_TIMER_ID)
+    private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @TimerId(PROCESSING_TIMER_ID)
+    private final TimerSpec processingTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    @ProcessElement
+    public void dropInput(
+        ProcessContext context,
+        BoundedWindow window,
+        @StateId(BAG_STATE_ID) BagState<String> bagStateState,
+        @StateId(COMBINING_STATE_ID) CombiningState<Long, long[], Long> combiningStateState,
+        @TimerId(EVENT_TIMER_ID) Timer eventTimerTimer,
+        @TimerId(PROCESSING_TIMER_ID) Timer processingTimerTimer) {
+      context.output(null);
+    }
+
+    @OnTimer(EVENT_TIMER_ID)
+    public void onEventTime(OnTimerContext context) {}
+
+    @OnTimer(PROCESSING_TIMER_ID)
+    public void onProcessingTime(OnTimerContext context) {}
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof StateTimerDropElementsFn;
+    }
+
+    @Override
+    public int hashCode() {
+      return StateTimerDropElementsFn.class.hashCode();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index bf4df2a..c8722e6 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -248,10 +248,20 @@ message Parameter {
 
 message StateSpec {
   // TODO: AST for state spec
+  string id = 1;
+  Type type = 2;
+
+  enum Type {
+    VALUE = 0;
+    BAG = 1;
+    MAP = 2;
+    SET = 3;
+  }
 }
 
 message TimerSpec {
   // TODO: AST for timer spec
+  string id = 1;
 }
 
 enum IsBounded {

http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
index 1103a24..d48d26b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms.windowing;
 
+import com.google.auto.value.AutoValue;
 import java.util.Collection;
 import java.util.Collections;
 import org.apache.beam.sdk.coders.Coder;
@@ -61,10 +62,11 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow>
{
 
   @Override
   public WindowMappingFn<GlobalWindow> getDefaultWindowMappingFn() {
-    return new GlobalWindowMappingFn();
+    return new AutoValue_GlobalWindows_GlobalWindowMappingFn();
   }
 
-  static class GlobalWindowMappingFn extends WindowMappingFn<GlobalWindow> {
+  @AutoValue
+  abstract static class GlobalWindowMappingFn extends WindowMappingFn<GlobalWindow>
{
     @Override
     public GlobalWindow getSideInputWindow(BoundedWindow mainWindow) {
       return GlobalWindow.INSTANCE;


Mime
View raw message