beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/3] beam git commit: Expand all PValues to component PCollections always
Date Thu, 08 Jun 2017 16:14:11 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index 6c385d7..1853248 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -42,12 +42,12 @@ import org.apache.beam.sdk.values.PCollectionView;
 class StreamingViewOverrides {
   static class StreamingCreatePCollectionViewFactory<ElemT, ViewT>
       extends SingleInputOutputOverrideFactory<
-          PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT,
ViewT>> {
+          PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT,
ViewT>> {
     @Override
-    public PTransformReplacement<PCollection<ElemT>, PCollectionView<ViewT>>
+    public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT,
ViewT>>
+                    PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT,
ViewT>>
                 transform) {
       StreamingCreatePCollectionView<ElemT, ViewT> streamingView =
           new StreamingCreatePCollectionView<>(transform.getTransform().getView());
@@ -56,7 +56,7 @@ class StreamingViewOverrides {
     }
 
     private static class StreamingCreatePCollectionView<ElemT, ViewT>
-        extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>>
{
+        extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
       private final PCollectionView<ViewT> view;
 
       private StreamingCreatePCollectionView(PCollectionView<ViewT> view) {
@@ -64,7 +64,7 @@ class StreamingViewOverrides {
       }
 
       @Override
-      public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+      public PCollection<ElemT> expand(PCollection<ElemT> input) {
         return input
             .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
             .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 89dc2d5..53215f6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -920,15 +920,15 @@ public class DataflowPipelineTranslatorTest implements Serializable
{
     assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
-    assertEquals(5, steps.size());
+    assertEquals(9, steps.size());
 
     @SuppressWarnings("unchecked")
     List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
+        (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO);
     assertTrue(
         Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
 
-    Step collectionToSingletonStep = steps.get(4);
+    Step collectionToSingletonStep = steps.get(8);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 64aa35a..ac5e0cd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -508,50 +508,6 @@ public final class TransformTranslator {
     };
   }
 
-  private static <T> TransformEvaluator<View.AsSingleton<T>> viewAsSingleton()
{
-    return new TransformEvaluator<View.AsSingleton<T>>() {
-      @Override
-      public void evaluate(View.AsSingleton<T> transform, EvaluationContext context)
{
-        Iterable<? extends WindowedValue<?>> iter =
-        context.getWindowedValues(context.getInput(transform));
-        PCollectionView<T> output = context.getOutput(transform);
-        Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
-
-        @SuppressWarnings("unchecked")
-        Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>)
iter;
-
-        context.putPView(output, iterCast, coderInternal);
-      }
-
-      @Override
-      public String toNativeString() {
-        return "collect()";
-      }
-    };
-  }
-
-  private static <T> TransformEvaluator<View.AsIterable<T>> viewAsIter()
{
-    return new TransformEvaluator<View.AsIterable<T>>() {
-      @Override
-      public void evaluate(View.AsIterable<T> transform, EvaluationContext context)
{
-        Iterable<? extends WindowedValue<?>> iter =
-            context.getWindowedValues(context.getInput(transform));
-        PCollectionView<Iterable<T>> output = context.getOutput(transform);
-        Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
-
-        @SuppressWarnings("unchecked")
-        Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>)
iter;
-
-        context.putPView(output, iterCast, coderInternal);
-      }
-
-      @Override
-      public String toNativeString() {
-        return "collect()";
-      }
-    };
-  }
-
   private static <ReadT, WriteT> TransformEvaluator<View.CreatePCollectionView<ReadT,
WriteT>>
   createPCollView() {
     return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>()
{
@@ -560,7 +516,7 @@ public final class TransformTranslator {
                            EvaluationContext context) {
         Iterable<? extends WindowedValue<?>> iter =
             context.getWindowedValues(context.getInput(transform));
-        PCollectionView<WriteT> output = context.getOutput(transform);
+        PCollectionView<WriteT> output = transform.getView();
         Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
 
         @SuppressWarnings("unchecked")
@@ -645,8 +601,8 @@ public final class TransformTranslator {
     EVALUATORS.put(Combine.PerKey.class, combinePerKey());
     EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
     EVALUATORS.put(Create.Values.class, create());
-    EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
-    EVALUATORS.put(View.AsIterable.class, viewAsIter());
+//    EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
+//    EVALUATORS.put(View.AsIterable.class, viewAsIter());
     EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
     EVALUATORS.put(Window.Assign.class, window());
     EVALUATORS.put(Reshuffle.class, reshuffle());

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 2f0e8ef..ee1ce7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -24,10 +24,12 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -39,6 +41,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
@@ -165,7 +168,7 @@ public class TransformHierarchy {
    * nodes.
    */
   public void setOutput(POutput output) {
-    for (PValue value : output.expand().values()) {
+    for (PCollection<?> value : fullyExpand(output).values()) {
       if (!producers.containsKey(value)) {
         producers.put(value, current);
         value.finishSpecifyingOutput(
@@ -226,6 +229,47 @@ public class TransformHierarchy {
     return current;
   }
 
+  private Map<TupleTag<?>, PCollection<?>> fullyExpand(POutput output)
{
+    Map<TupleTag<?>, PCollection<?>> result = new LinkedHashMap<>();
+    for (Map.Entry<TupleTag<?>, PValue> value : output.expand().entrySet()) {
+      if (value.getValue() instanceof PCollection) {
+        PCollection<?> previous = result.put(value.getKey(), (PCollection<?>)
value.getValue());
+        checkArgument(
+            previous == null,
+            "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s",
+            output,
+            TupleTag.class.getSimpleName(),
+            value.getKey(),
+            previous,
+            value.getValue());
+      } else {
+        if (value.getValue().expand().size() == 1
+            && Iterables.getOnlyElement(value.getValue().expand().values())
+                .equals(value.getValue())) {
+          throw new IllegalStateException(
+              String.format(
+                  "Non %s %s that expands into itself %s",
+                  PCollection.class.getSimpleName(),
+                  PValue.class.getSimpleName(),
+                  value.getValue()));
+        }
+        for (Map.Entry<TupleTag<?>, PCollection<?>> valueComponent :
+            fullyExpand(value.getValue()).entrySet()) {
+          PCollection<?> previous = result.put(valueComponent.getKey(), valueComponent.getValue());
+          checkArgument(
+              previous == null,
+              "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s",
+              output,
+              TupleTag.class.getSimpleName(),
+              valueComponent.getKey(),
+              previous,
+              valueComponent.getValue());
+        }
+      }
+    }
+    return result;
+  }
+
   /**
    * Provides internal tracking of transform relationships with helper methods
    * for initialization and ordered visitation.

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 9e1cc71..6a90bcf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1277,14 +1277,15 @@ public class Combine {
     public PCollectionView<OutputT> expand(PCollection<InputT> input) {
       PCollection<OutputT> combined =
           input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
-      return combined.apply(
-          CreatePCollectionView.<OutputT, OutputT>of(
-              PCollectionViews.singletonView(
-                  combined,
-                  input.getWindowingStrategy(),
-                  insertDefault,
-                  insertDefault ? fn.defaultValue() : null,
-                  combined.getCoder())));
+      PCollectionView<OutputT> view =
+          PCollectionViews.singletonView(
+              combined,
+              input.getWindowingStrategy(),
+              insertDefault,
+              insertDefault ? fn.defaultValue() : null,
+              combined.getCoder());
+      combined.apply(CreatePCollectionView.<OutputT, OutputT>of(view));
+      return view;
     }
 
     public int getFanout() {

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 073c750..331b143 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -257,8 +257,10 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input",
e);
       }
 
-      return input.apply(CreatePCollectionView.<T, List<T>>of(PCollectionViews.listView(
-          input, input.getWindowingStrategy(), input.getCoder())));
+      PCollectionView<List<T>> view =
+          PCollectionViews.listView(input, input.getWindowingStrategy(), input.getCoder());
+      input.apply(CreatePCollectionView.<T, List<T>>of(view));
+      return view;
     }
   }
 
@@ -282,8 +284,10 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input",
e);
       }
 
-      return input.apply(CreatePCollectionView.<T, Iterable<T>>of(PCollectionViews.iterableView(
-          input, input.getWindowingStrategy(), input.getCoder())));
+      PCollectionView<Iterable<T>> view =
+          PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
+      input.apply(CreatePCollectionView.<T, Iterable<T>>of(view));
+      return view;
     }
   }
 
@@ -423,11 +427,10 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input",
e);
       }
 
-      return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(
-          PCollectionViews.multimapView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder())));
+      PCollectionView<Map<K, Iterable<V>>> view =
+          PCollectionViews.multimapView(input, input.getWindowingStrategy(), input.getCoder());
+      input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+      return view;
     }
   }
 
@@ -459,11 +462,10 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input",
e);
       }
 
-      return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(
-          PCollectionViews.mapView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder())));
+      PCollectionView<Map<K, V>> view =
+          PCollectionViews.mapView(input, input.getWindowingStrategy(), input.getCoder());
+      input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view));
+      return view;
     }
   }
 
@@ -480,7 +482,7 @@ public class View {
    */
   @Internal
   public static class CreatePCollectionView<ElemT, ViewT>
-      extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+      extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
     private PCollectionView<ViewT> view;
 
     private CreatePCollectionView(PCollectionView<ViewT> view) {
@@ -506,8 +508,10 @@ public class View {
     }
 
     @Override
-    public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
-      return view;
+    public PCollection<ElemT> expand(PCollection<ElemT> input) {
+      return PCollection.<ElemT>createPrimitiveOutputInternal(
+              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
+          .setCoder(input.getCoder());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index f210fd8..4063d11 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.values;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
+import java.util.Collections;
+import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Internal;
@@ -226,6 +228,11 @@ public class PCollection<T> extends PValueBase implements PValue
{
     return super.getName();
   }
 
+  @Override
+  public final Map<TupleTag<?>, PValue> expand() {
+    return Collections.<TupleTag<?>, PValue>singletonMap(tag, this);
+  }
+
   /**
    * Sets the name of this {@link PCollection}.  Returns {@code this}.
    *
@@ -314,6 +321,11 @@ public class PCollection<T> extends PValueBase implements PValue
{
 
   private IsBounded isBounded;
 
+  /**
+   * A local {@link TupleTag} used in the expansion of this {@link PValueBase}.
+   */
+  private final TupleTag<?> tag = new TupleTag<>();
+
   private PCollection(Pipeline p) {
     super(p);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index 74887c7..5e2e2c3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -170,6 +170,15 @@ public class PCollectionViews {
     }
 
     /**
+     * Returns if a default value was specified.
+     */
+    @Deprecated
+    @Internal
+    public boolean hasDefault() {
+      return hasDefault;
+    }
+
+    /**
      * Returns the default value that was specified.
      *
      * <p>For internal use only.
@@ -491,5 +500,10 @@ public class PCollectionViews {
     public String toString() {
       return MoreObjects.toStringHelper(this).add("tag", tag).toString();
     }
+
+    @Override
+    public Map<TupleTag<?>, PValue> expand() {
+      return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
index 6f638d7..f312eac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
@@ -19,8 +19,6 @@ package org.apache.beam.sdk.values;
 
 import static com.google.common.base.Preconditions.checkState;
 
-import java.util.Collections;
-import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -87,11 +85,6 @@ public abstract class PValueBase implements PValue {
   private String name;
 
   /**
-   * A local {@link TupleTag} used in the expansion of this {@link PValueBase}.
-   */
-  private TupleTag<?> tag = new TupleTag<>();
-
-  /**
    * Whether this {@link PValueBase} has been finalized, and its core
    * properties, e.g., name, can no longer be changed.
    */
@@ -108,11 +101,6 @@ public abstract class PValueBase implements PValue {
   }
 
   @Override
-  public final Map<TupleTag<?>, PValue> expand() {
-    return Collections.<TupleTag<?>, PValue>singletonMap(tag, this);
-  }
-
-  @Override
   public void finishSpecifying(PInput input, PTransform<?, ?> transform) {
     finishedSpecifying = true;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index adf27f8..aaf8b91 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -22,7 +22,9 @@ import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -37,6 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.PValueBase;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -349,5 +352,10 @@ public final class PCollectionViewTesting {
           .add("viewFn", viewFn)
           .toString();
     }
+
+    @Override
+    public Map<TupleTag<?>, PValue> expand() {
+      return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection);
+    }
   }
 }


Mime
View raw message