beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [beam] branch master updated: [BEAM-6195] Make ProcessRemoteBundleOperation map PCollectionId into correct OutputReceiver and throws Exception when there is more than one input PCollection. (#7223)
Date Sat, 08 Dec 2018 00:14:38 GMT
This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 17968a2  [BEAM-6195] Make ProcessRemoteBundleOperation map PCollectionId into correct OutputReceiver and throws Exception when there is more than one input PCollection. (#7223)
17968a2 is described below

commit 17968a27c02d990f4880e8247a88508ff83c6cfd
Author: Boyuan Zhang <36090911+boyuanzz@users.noreply.github.com>
AuthorDate: Fri Dec 7 16:14:31 2018 -0800

    [BEAM-6195] Make ProcessRemoteBundleOperation map PCollectionId into correct OutputReceiver and throws Exception when there is more than one input PCollection. (#7223)
---
 .../dataflow/worker/BatchDataflowWorker.java       |  2 +-
 .../worker/BeamFnMapTaskExecutorFactory.java       | 17 ++---
 .../worker/IntrinsicMapTaskExecutorFactory.java    |  2 +-
 .../dataflow/worker/StreamingDataflowWorker.java   |  2 +-
 .../fn/control/ProcessRemoteBundleOperation.java   | 20 +++---
 .../graph/CloneAmbiguousFlattensFunction.java      |  9 ++-
 .../graph/CreateExecutableStageNodeFunction.java   |  6 +-
 .../graph/CreateRegisterFnOperationFunction.java   |  6 +-
 ...nsertFetchAndFilterStreamingSideInputNodes.java |  3 +-
 .../worker/graph/LengthPrefixUnknownCoders.java    |  4 +-
 .../worker/graph/MapTaskToNetworkFunction.java     | 11 +++-
 .../beam/runners/dataflow/worker/graph/Nodes.java  | 19 ++++--
 .../IntrinsicMapTaskExecutorFactoryTest.java       | 18 ++++--
 .../graph/CloneAmbiguousFlattensFunctionTest.java  |  2 +-
 .../CreateRegisterFnOperationFunctionTest.java     |  2 +-
 .../graph/DeduceFlattenLocationsFunctionTest.java  |  2 +-
 .../graph/DeduceNodeLocationsFunctionTest.java     |  4 +-
 ...tFetchAndFilterStreamingSideInputNodesTest.java |  7 +-
 .../graph/LengthPrefixUnknownCodersTest.java       |  2 +-
 .../worker/graph/MapTaskToNetworkFunctionTest.java | 25 +++++---
 .../runners/dataflow/worker/graph/NodesTest.java   | 16 +++--
 .../RemoveFlattenInstructionsFunctionTest.java     | 74 +++++++++++++++-------
 .../ReplacePgbkWithPrecombineFunctionTest.java     |  2 +-
 23 files changed, 168 insertions(+), 87 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index f0e8ddb..00504d2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -97,7 +97,7 @@ public class BatchDataflowWorker implements Closeable {
    */
   private static final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToBaseNetwork =
       new FixMultiOutputInfosOnParDoInstructions(idGenerator)
-          .andThen(new MapTaskToNetworkFunction());
+          .andThen(new MapTaskToNetworkFunction(idGenerator));
 
   /** Registry of known {@link ReaderFactory ReaderFactories}. */
   private final ReaderRegistry readerRegistry = ReaderRegistry.defaultRegistry();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 78bba56..f2d31e8 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -37,11 +37,11 @@ import com.google.common.graph.MutableNetwork;
 import com.google.common.graph.Network;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.core.ElementByteSizeObservable;
@@ -338,19 +338,20 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
         Iterable<OutputReceiverNode> outputReceiverNodes =
             Iterables.filter(network.successors(input), OutputReceiverNode.class);
 
-        OutputReceiver[] outputReceivers = new OutputReceiver[Iterables.size(outputReceiverNodes)];
+        Map<String, OutputReceiver> outputReceiverMap = new HashMap<>();
         Lists.newArrayList(outputReceiverNodes)
             .stream()
-            .map(outputReceiverNode -> outputReceiverNode.getOutputReceiver())
-            .collect(Collectors.toList())
-            .toArray(outputReceivers);
-
+            .forEach(
+                outputReceiverNode ->
+                    outputReceiverMap.put(
+                        outputReceiverNode.getPcollectionId(),
+                        outputReceiverNode.getOutputReceiver()));
         return OperationNode.create(
             new ProcessRemoteBundleOperation(
                 executionContext.createOperationContext(
                     NameContext.create(stageName, stageName, stageName, stageName)),
                 stageBundleFactory,
-                outputReceivers));
+                outputReceiverMap));
       }
     };
   }
@@ -667,7 +668,7 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
                     cloudOutput.getName()));
         outputReceiver.addOutputCounter(outputCounter);
 
-        return OutputReceiverNode.create(outputReceiver, coder);
+        return OutputReceiverNode.create(outputReceiver, coder, input.getPcollectionId());
       }
     };
   }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
index 4ba66ba..6285dfd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
@@ -357,7 +357,7 @@ public class IntrinsicMapTaskExecutorFactory implements DataflowMapTaskExecutorF
                     cloudOutput.getName()));
         outputReceiver.addOutputCounter(outputCounter);
 
-        return OutputReceiverNode.create(outputReceiver, coder);
+        return OutputReceiverNode.create(outputReceiver, coder, input.getPcollectionId());
       }
     };
   }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index d6de907..f32fa746 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -162,7 +162,7 @@ public class StreamingDataflowWorker {
    * </ul>
    */
   private static final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToBaseNetwork =
-      new MapTaskToNetworkFunction();
+      new MapTaskToNetworkFunction(idGenerator);
 
   // Maximum number of threads for processing.  Currently each thread processes one key at a time.
   static final int MAX_PROCESSING_THREADS = 300;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
index ae8a13e..8161970 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker.fn.control;
 
 import com.google.common.collect.Iterables;
 import java.io.Closeable;
+import java.util.Map;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ReceivingOperation;
@@ -41,15 +42,15 @@ import org.slf4j.LoggerFactory;
 public class ProcessRemoteBundleOperation<InputT> extends ReceivingOperation {
   private static final Logger LOG = LoggerFactory.getLogger(ProcessRemoteBundleOperation.class);
   private final StageBundleFactory stageBundleFactory;
+  private static final OutputReceiver[] EMPTY_RECEIVER_ARRAY = new OutputReceiver[0];
+  private final Map<String, OutputReceiver> outputReceiverMap;
   private final OutputReceiverFactory receiverFactory =
       new OutputReceiverFactory() {
         @Override
         public FnDataReceiver<?> create(String pCollectionId) {
           return receivedElement -> {
-            for (OutputReceiver receiver : receivers) {
-              LOG.debug("Consume element {}", receivedElement);
-              receiver.process((WindowedValue<?>) receivedElement);
-            }
+            LOG.debug("Consume element {}", receivedElement);
+            outputReceiverMap.get(pCollectionId).process((WindowedValue<?>) receivedElement);
           };
         }
       };
@@ -58,11 +59,14 @@ public class ProcessRemoteBundleOperation<InputT> extends ReceivingOperation {
   private RemoteBundle remoteBundle;
 
   public ProcessRemoteBundleOperation(
-      OperationContext context, StageBundleFactory stageBundleFactory, OutputReceiver[] receivers) {
-    super(receivers, context);
+      OperationContext context,
+      StageBundleFactory stageBundleFactory,
+      Map<String, OutputReceiver> outputReceiverMap) {
+    super(EMPTY_RECEIVER_ARRAY, context);
     this.stageBundleFactory = stageBundleFactory;
-    stateRequestHandler = StateRequestHandler.unsupported();
-    progressHandler = BundleProgressHandler.ignored();
+    this.outputReceiverMap = outputReceiverMap;
+    this.stateRequestHandler = StateRequestHandler.unsupported();
+    this.progressHandler = BundleProgressHandler.ignored();
   }
 
   @Override
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunction.java
index 7b356e8..04edc4e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunction.java
@@ -89,21 +89,24 @@ public class CloneAmbiguousFlattensFunction
    */
   private void cloneFlatten(Node flatten, MutableNetwork<Node, Edge> network) {
     // Start by creating the clones of the flatten and its PCollection.
-    Node flattenOut = Iterables.getOnlyElement(network.successors(flatten));
+    InstructionOutputNode flattenOut =
+        (InstructionOutputNode) Iterables.getOnlyElement(network.successors(flatten));
     ParallelInstruction flattenInstruction =
         ((ParallelInstructionNode) flatten).getParallelInstruction();
 
     Node runnerFlatten =
         ParallelInstructionNode.create(flattenInstruction, ExecutionLocation.RUNNER_HARNESS);
     Node runnerFlattenOut =
-        InstructionOutputNode.create(((InstructionOutputNode) flattenOut).getInstructionOutput());
+        InstructionOutputNode.create(
+            flattenOut.getInstructionOutput(), flattenOut.getPcollectionId());
     network.addNode(runnerFlatten);
     network.addNode(runnerFlattenOut);
 
     Node sdkFlatten =
         ParallelInstructionNode.create(flattenInstruction, ExecutionLocation.SDK_HARNESS);
     Node sdkFlattenOut =
-        InstructionOutputNode.create(((InstructionOutputNode) flattenOut).getInstructionOutput());
+        InstructionOutputNode.create(
+            flattenOut.getInstructionOutput(), flattenOut.getPcollectionId());
     network.addNode(sdkFlatten);
     network.addNode(sdkFlattenOut);
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index c93e9e9..81d0295 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -227,7 +227,7 @@ public class CreateExecutableStageNodeFunction
             e);
       }
 
-      String pcollectionId = "generatedPcollection" + idGenerator.getId();
+      String pcollectionId = node.getPcollectionId();
       RunnerApi.PCollection pCollection =
           RunnerApi.PCollection.newBuilder()
               .setCoderId(coderId)
@@ -351,6 +351,10 @@ public class CreateExecutableStageNodeFunction
       executableStageTransforms.add(PipelineNode.pTransform(ptransformId, pTransform.build()));
     }
 
+    if (executableStageInputs.size() != 1) {
+      throw new UnsupportedOperationException("ExecutableStage only support one input PCollection");
+    }
+
     PCollectionNode executableInput = executableStageInputs.iterator().next();
     RunnerApi.Components executableStageComponents = componentsBuilder.build();
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
index 5a1dffb..996a3e2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
@@ -261,9 +261,11 @@ public class CreateRegisterFnOperationFunction
       Set<Node> successors) {
 
     InstructionOutputNode newPredecessorOutputNode =
-        InstructionOutputNode.create(outputNode.getInstructionOutput());
+        InstructionOutputNode.create(
+            outputNode.getInstructionOutput(), outputNode.getPcollectionId());
     InstructionOutputNode portOutputNode =
-        InstructionOutputNode.create(outputNode.getInstructionOutput());
+        InstructionOutputNode.create(
+            outputNode.getInstructionOutput(), outputNode.getPcollectionId());
     String predecessorPortEdgeId = idGenerator.getId();
     String successorPortEdgeId = idGenerator.getId();
     Node portNode = portSupplier.apply(predecessorPortEdgeId, successorPortEdgeId);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodes.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodes.java
index fab79af..2803621 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodes.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodes.java
@@ -158,7 +158,8 @@ public class InsertFetchAndFilterStreamingSideInputNodes {
       InstructionOutputNode predecessor =
           (InstructionOutputNode) network.incidentNodes(mainInput).source();
       InstructionOutputNode predecessorCopy =
-          InstructionOutputNode.create(predecessor.getInstructionOutput());
+          InstructionOutputNode.create(
+              predecessor.getInstructionOutput(), predecessor.getPcollectionId());
       network.removeEdge(mainInput);
       network.addNode(streamingSideInputWindowHandlerNode);
       network.addNode(predecessorCopy);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
index 398037e..0487438 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
@@ -153,7 +153,7 @@ public class LengthPrefixUnknownCoders {
                 e);
           }
         }
-        return InstructionOutputNode.create(cloudOutput);
+        return InstructionOutputNode.create(cloudOutput, input.getPcollectionId());
       }
     };
   }
@@ -179,7 +179,7 @@ public class LengthPrefixUnknownCoders {
                   input.getInstructionOutput()),
               e);
         }
-        return InstructionOutputNode.create(instructionOutput);
+        return InstructionOutputNode.create(instructionOutput, input.getPcollectionId());
       }
     };
   }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunction.java
index 0856cc8..b9212bc 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunction.java
@@ -36,6 +36,7 @@ import org.apache.beam.runners.dataflow.worker.graph.Edges.MultiOutputInfoEdge;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.InstructionOutputNode;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.util.Transport;
 
 /**
@@ -63,6 +64,12 @@ public class MapTaskToNetworkFunction implements Function<MapTask, MutableNetwor
     }
   }
 
+  private final IdGenerator idGenerator;
+
+  public MapTaskToNetworkFunction(IdGenerator idGenerator) {
+    this.idGenerator = idGenerator;
+  }
+
   @Override
   public MutableNetwork<Node, Edge> apply(MapTask mapTask) {
     List<ParallelInstruction> parallelInstructions = Apiary.listOrEmpty(mapTask.getInstructions());
@@ -98,7 +105,9 @@ public class MapTaskToNetworkFunction implements Function<MapTask, MutableNetwor
       // Connect the instruction node output to the output PCollection node
       for (int j = 0; j < outputs.size(); ++j) {
         InstructionOutput instructionOutput = outputs.get(j);
-        InstructionOutputNode outputNode = InstructionOutputNode.create(instructionOutput);
+        InstructionOutputNode outputNode =
+            InstructionOutputNode.create(
+                instructionOutput, "generatedPcollection" + this.idGenerator.getId());
         network.addNode(outputNode);
         if (parallelInstruction.getParDo() != null) {
           network.addEdge(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
index 5d058a97..a328e22 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
@@ -206,20 +206,25 @@ public class Nodes {
     public abstract ExecutionLocation getExecutionLocation();
   }
 
-  /** A node that stores {@link InstructionOutput}s. */
+  /** A node that stores {@link InstructionOutput}s with the corresponding . */
   @AutoValue
   public abstract static class InstructionOutputNode extends Node {
-    public static InstructionOutputNode create(InstructionOutput instructionOutput) {
+    public static InstructionOutputNode create(
+        InstructionOutput instructionOutput, String pcollectionId) {
       checkNotNull(instructionOutput);
-      return new AutoValue_Nodes_InstructionOutputNode(instructionOutput);
+      checkNotNull(pcollectionId);
+      return new AutoValue_Nodes_InstructionOutputNode(instructionOutput, pcollectionId);
     }
 
     public abstract InstructionOutput getInstructionOutput();
 
+    public abstract String getPcollectionId();
+
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(this)
           .add("instructionOutput", toStringWithTrimmedLiterals(getInstructionOutput()))
+          .add("pcollectionId", getPcollectionId())
           .toString();
     }
   }
@@ -227,14 +232,18 @@ public class Nodes {
   /** A node that stores {@link OutputReceiver}s. */
   @AutoValue
   public abstract static class OutputReceiverNode extends Node {
-    public static OutputReceiverNode create(OutputReceiver outputReceiver, Coder<?> coder) {
+    public static OutputReceiverNode create(
+        OutputReceiver outputReceiver, Coder<?> coder, String pcollectionId) {
       checkNotNull(outputReceiver);
-      return new AutoValue_Nodes_OutputReceiverNode(outputReceiver, coder);
+      checkNotNull(pcollectionId);
+      return new AutoValue_Nodes_OutputReceiverNode(outputReceiver, coder, pcollectionId);
     }
 
     public abstract OutputReceiver getOutputReceiver();
 
     public abstract Coder<?> getCoder();
+
+    public abstract String getPcollectionId();
   }
 
   /** A node that stores {@link Operation}s. */
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
index 182ad50..95836c5 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
@@ -120,7 +120,7 @@ public class IntrinsicMapTaskExecutorFactoryTest {
 
   private static final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToNetwork =
       new FixMultiOutputInfosOnParDoInstructions(idGenerator)
-          .andThen(new MapTaskToNetworkFunction());
+          .andThen(new MapTaskToNetworkFunction(idGenerator));
 
   private static final CloudObject windowedStringCoder =
       CloudObjects.asCloudObject(
@@ -130,6 +130,8 @@ public class IntrinsicMapTaskExecutorFactoryTest {
   private PipelineOptions options;
   private ReaderRegistry readerRegistry;
   private SinkRegistry sinkRegistry;
+  private static final String PCOLLECTION_ID = "fakeId";
+
   @Mock private Network<Node, Edge> network;
   @Mock private CounterUpdateExtractor<?> updateExtractor;
 
@@ -326,7 +328,8 @@ public class IntrinsicMapTaskExecutorFactoryTest {
                 IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, counterSet)
                     .apply(
                         InstructionOutputNode.create(
-                            instructionNode.getParallelInstruction().getOutputs().get(0)))));
+                            instructionNode.getParallelInstruction().getOutputs().get(0),
+                            PCOLLECTION_ID))));
     when(network.outDegree(instructionNode)).thenReturn(1);
 
     Node operationNode =
@@ -524,7 +527,7 @@ public class IntrinsicMapTaskExecutorFactoryTest {
         IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, counterSet)
             .apply(
                 InstructionOutputNode.create(
-                    instructionNode.getParallelInstruction().getOutputs().get(0)));
+                    instructionNode.getParallelInstruction().getOutputs().get(0), PCOLLECTION_ID));
 
     when(network.successors(instructionNode)).thenReturn(ImmutableSet.of(outputReceiverNode));
     when(network.outDegree(instructionNode)).thenReturn(1);
@@ -601,7 +604,8 @@ public class IntrinsicMapTaskExecutorFactoryTest {
                 IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, counterSet)
                     .apply(
                         InstructionOutputNode.create(
-                            instructionNode.getParallelInstruction().getOutputs().get(0)))));
+                            instructionNode.getParallelInstruction().getOutputs().get(0),
+                            PCOLLECTION_ID))));
     when(network.outDegree(instructionNode)).thenReturn(1);
 
     Node operationNode =
@@ -652,7 +656,8 @@ public class IntrinsicMapTaskExecutorFactoryTest {
                 IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, counterSet)
                     .apply(
                         InstructionOutputNode.create(
-                            instructionNode.getParallelInstruction().getOutputs().get(0)))));
+                            instructionNode.getParallelInstruction().getOutputs().get(0),
+                            PCOLLECTION_ID))));
     when(network.outDegree(instructionNode)).thenReturn(1);
 
     Node operationNode =
@@ -729,7 +734,8 @@ public class IntrinsicMapTaskExecutorFactoryTest {
                 IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, counterSet)
                     .apply(
                         InstructionOutputNode.create(
-                            instructionNode.getParallelInstruction().getOutputs().get(0)))));
+                            instructionNode.getParallelInstruction().getOutputs().get(0),
+                            PCOLLECTION_ID))));
     when(network.outDegree(instructionNode)).thenReturn(1);
 
     Node operationNode =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunctionTest.java
index b74a287..04f1214 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunctionTest.java
@@ -382,7 +382,7 @@ public final class CloneAmbiguousFlattensFunctionTest {
 
   /** Creates an {@link InstructionOutputNode} to act as a PCollection. */
   private static InstructionOutputNode createPCollection(String name) {
-    return InstructionOutputNode.create(new InstructionOutput().setName(name));
+    return InstructionOutputNode.create(new InstructionOutput().setName(name), "fakeId");
   }
 
   /** Creates a {@link NoLocationNode} to use for testing nodes that have no ExecutionLocation */
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
index 87d4035..ec24fd7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
@@ -535,7 +535,7 @@ public class CreateRegisterFnOperationFunctionTest {
   }
 
   private static InstructionOutputNode createInstructionOutputNode(String name) {
-    return InstructionOutputNode.create(new InstructionOutput().setName(name));
+    return InstructionOutputNode.create(new InstructionOutput().setName(name), "fakeId");
   }
 
   /** A named node to easily differentiate graph construction problems during testing. */
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceFlattenLocationsFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceFlattenLocationsFunctionTest.java
index 44fd072..c1c5df0 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceFlattenLocationsFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceFlattenLocationsFunctionTest.java
@@ -381,7 +381,7 @@ public final class DeduceFlattenLocationsFunctionTest {
 
   /** Creates an {@link InstructionOutputNode} to act as a PCollection. */
   private static InstructionOutputNode createPCollection(String name) {
-    return InstructionOutputNode.create(new InstructionOutput().setName(name));
+    return InstructionOutputNode.create(new InstructionOutput().setName(name), "fakeID");
   }
 
   private static ExecutionLocation getExecutionLocationOf(
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceNodeLocationsFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceNodeLocationsFunctionTest.java
index 4717cca..7d8177d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceNodeLocationsFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceNodeLocationsFunctionTest.java
@@ -198,9 +198,9 @@ public final class DeduceNodeLocationsFunctionTest {
     //               --> Flatten --> D
     // B --> out2 --/-->C
     Node a = createReadNode("A", CUSTOM_SOURCE);
-    Node out1 = InstructionOutputNode.create(new InstructionOutput());
+    Node out1 = InstructionOutputNode.create(new InstructionOutput(), "fakeId");
     Node b = createReadNode("B", RUNNER_SOURCE);
-    Node out2 = InstructionOutputNode.create(new InstructionOutput());
+    Node out2 = InstructionOutputNode.create(new InstructionOutput(), "fakeId");
     Node c = createParDoNode("C", "RunnerDoFn");
     Node flatten =
         ParallelInstructionNode.create(
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodesTest.java
index 6f727ee..8d39fc3 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodesTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodesTest.java
@@ -93,7 +93,8 @@ public class InsertFetchAndFilterStreamingSideInputNodesTest {
     RunnerApi.Pipeline pipeline = PipelineTranslation.toProto(p);
 
     Node predecessor = createParDoNode("predecessor");
-    InstructionOutputNode mainInput = InstructionOutputNode.create(new InstructionOutput());
+    InstructionOutputNode mainInput =
+        InstructionOutputNode.create(new InstructionOutput(), "fakeId");
     Node sideInputParDo = createParDoNode(findParDoWithSideInput(pipeline));
 
     MutableNetwork<Node, Edge> network = createEmptyNetwork();
@@ -106,7 +107,7 @@ public class InsertFetchAndFilterStreamingSideInputNodesTest {
     Network<Node, Edge> inputNetwork = ImmutableNetwork.copyOf(network);
     network = InsertFetchAndFilterStreamingSideInputNodes.with(pipeline).forNetwork(network);
 
-    Node mainInputClone = InstructionOutputNode.create(mainInput.getInstructionOutput());
+    Node mainInputClone = InstructionOutputNode.create(mainInput.getInstructionOutput(), "fakeId");
     Node fetchAndFilter =
         FetchAndFilterStreamingSideInputsNode.create(
             pcView.getWindowingStrategyInternal(),
@@ -139,7 +140,7 @@ public class InsertFetchAndFilterStreamingSideInputNodesTest {
     RunnerApi.Pipeline pipeline = PipelineTranslation.toProto(p);
 
     Node predecessor = createParDoNode("predecessor");
-    Node mainInput = InstructionOutputNode.create(new InstructionOutput());
+    Node mainInput = InstructionOutputNode.create(new InstructionOutput(), "fakeId");
     Node sideInputParDo = createParDoNode("noSideInput");
 
     MutableNetwork<Node, Edge> network = createEmptyNetwork();
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
index 661e394..0ee0d45 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
@@ -367,6 +367,6 @@ public class LengthPrefixUnknownCodersTest {
             .setName(name)
             .setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
     instructionOutput.setFactory(new JacksonFactory());
-    return InstructionOutputNode.create(instructionOutput);
+    return InstructionOutputNode.create(instructionOutput, "fakeId");
   }
 }
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunctionTest.java
index 19b4556..8daa494 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunctionTest.java
@@ -47,6 +47,7 @@ import org.apache.beam.runners.dataflow.worker.graph.Edges.MultiOutputInfoEdge;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.InstructionOutputNode;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.util.Transport;
 import org.hamcrest.Matchers;
 import org.junit.Test;
@@ -58,7 +59,8 @@ import org.junit.runners.JUnit4;
 public class MapTaskToNetworkFunctionTest {
   @Test
   public void testEmptyMapTask() {
-    Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(new MapTask());
+    Network<Node, Edge> network =
+        new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(new MapTask());
     assertTrue(network.isDirected());
     assertTrue(network.allowsParallelEdges());
     assertFalse(network.allowsSelfLoops());
@@ -75,7 +77,8 @@ public class MapTaskToNetworkFunctionTest {
     mapTask.setInstructions(ImmutableList.of(read));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(2, network.nodes().size());
     assertEquals(1, network.edges().size());
@@ -103,7 +106,8 @@ public class MapTaskToNetworkFunctionTest {
     mapTask.setInstructions(ImmutableList.of(read, parDo));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(4, network.nodes().size());
     assertEquals(3, network.edges().size());
@@ -149,7 +153,8 @@ public class MapTaskToNetworkFunctionTest {
     mapTask.setInstructions(ImmutableList.of(readA, readB, flatten));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(6, network.nodes().size());
     assertEquals(5, network.edges().size());
@@ -193,7 +198,8 @@ public class MapTaskToNetworkFunctionTest {
     mapTask.setInstructions(ImmutableList.of(read, flatten));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(4, network.nodes().size());
     assertEquals(5, network.edges().size());
@@ -225,7 +231,8 @@ public class MapTaskToNetworkFunctionTest {
     mapTask.setInstructions(ImmutableList.of(read, write));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(3, network.nodes().size());
     assertEquals(2, network.edges().size());
@@ -260,7 +267,8 @@ public class MapTaskToNetworkFunctionTest {
     mapTask.setInstructions(ImmutableList.of(read, pgbk, write));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(5, network.nodes().size());
     assertEquals(4, network.edges().size());
@@ -310,7 +318,8 @@ public class MapTaskToNetworkFunctionTest {
     mapTask.setInstructions(ImmutableList.of(read, parDo, writeA, writeB));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(7, network.nodes().size());
     assertEquals(6, network.edges().size());
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
index 6da18d7..b27520d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
@@ -57,6 +57,8 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link Nodes}. */
 @RunWith(JUnit4.class)
 public class NodesTest {
+  private static final String PCOLLECTION_ID = "fakeId";
+
   @Test
   public void testParallelInstructionNode() {
     ParallelInstruction param = new ParallelInstruction();
@@ -73,18 +75,22 @@ public class NodesTest {
   @Test
   public void testInstructionOutputNode() {
     InstructionOutput param = new InstructionOutput();
-    assertSame(param, InstructionOutputNode.create(param).getInstructionOutput());
-    assertNotEquals(InstructionOutputNode.create(param), InstructionOutputNode.create(param));
+    assertSame(param, InstructionOutputNode.create(param, PCOLLECTION_ID).getInstructionOutput());
+    assertNotEquals(
+        InstructionOutputNode.create(param, PCOLLECTION_ID),
+        InstructionOutputNode.create(param, PCOLLECTION_ID));
   }
 
   @Test
   public void testOutputReceiverNode() {
     OutputReceiver receiver = new OutputReceiver();
     Coder<?> coder = StringUtf8Coder.of();
-    assertSame(receiver, OutputReceiverNode.create(receiver, coder).getOutputReceiver());
-    assertSame(coder, OutputReceiverNode.create(receiver, coder).getCoder());
+    assertSame(
+        receiver, OutputReceiverNode.create(receiver, coder, PCOLLECTION_ID).getOutputReceiver());
+    assertSame(coder, OutputReceiverNode.create(receiver, coder, PCOLLECTION_ID).getCoder());
     assertNotEquals(
-        OutputReceiverNode.create(receiver, coder), OutputReceiverNode.create(receiver, coder));
+        OutputReceiverNode.create(receiver, coder, PCOLLECTION_ID),
+        OutputReceiverNode.create(receiver, coder, PCOLLECTION_ID));
   }
 
   @Test
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/RemoveFlattenInstructionsFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/RemoveFlattenInstructionsFunctionTest.java
index bfe3983..b00fb29 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/RemoveFlattenInstructionsFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/RemoveFlattenInstructionsFunctionTest.java
@@ -46,6 +46,8 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link RemoveFlattenInstructionsFunction}. */
 @RunWith(JUnit4.class)
 public class RemoveFlattenInstructionsFunctionTest {
+  private static final String PCOLLECTION_ID = "fakeId";
+
   @Test
   public void testEmptyNetwork() {
     assertTrue(
@@ -59,24 +61,28 @@ public class RemoveFlattenInstructionsFunctionTest {
     Node a =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("A"), Nodes.ExecutionLocation.UNKNOWN);
-    Node aPCollection = InstructionOutputNode.create(new InstructionOutput().setName("A.out"));
+    Node aPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("A.out"), PCOLLECTION_ID);
     Edge aOutput = DefaultEdge.create();
     Node b =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("B"), Nodes.ExecutionLocation.UNKNOWN);
     Edge bOutput = DefaultEdge.create();
-    Node bPCollection = InstructionOutputNode.create(new InstructionOutput().setName("B.out"));
+    Node bPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("B.out"), PCOLLECTION_ID);
     Node flatten =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("Flatten").setFlatten(new FlattenInstruction()),
             Nodes.ExecutionLocation.UNKNOWN);
     Node flattenPCollection =
-        InstructionOutputNode.create(new InstructionOutput().setName("Flatten.out"));
+        InstructionOutputNode.create(
+            new InstructionOutput().setName("Flatten.out"), PCOLLECTION_ID);
     Node c =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("C"), Nodes.ExecutionLocation.UNKNOWN);
     Edge cOutput = DefaultEdge.create();
-    Node cPCollection = InstructionOutputNode.create(new InstructionOutput().setName("C.out"));
+    Node cPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("C.out"), PCOLLECTION_ID);
 
     // A --\
     //      Flatten --> C
@@ -109,9 +115,12 @@ public class RemoveFlattenInstructionsFunctionTest {
     Node a =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("A"), Nodes.ExecutionLocation.UNKNOWN);
-    Node aOut1PCollection = InstructionOutputNode.create(new InstructionOutput().setName("A.out1"));
-    Node aOut2PCollection = InstructionOutputNode.create(new InstructionOutput().setName("A.out2"));
-    Node aOut3PCollection = InstructionOutputNode.create(new InstructionOutput().setName("A.out3"));
+    Node aOut1PCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("A.out1"), PCOLLECTION_ID);
+    Node aOut2PCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("A.out2"), PCOLLECTION_ID);
+    Node aOut3PCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("A.out3"), PCOLLECTION_ID);
     Edge aOut1 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out1"));
     Edge aOut2 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out2"));
     Edge aOut3 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out3"));
@@ -119,8 +128,10 @@ public class RemoveFlattenInstructionsFunctionTest {
     Node b =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("B"), Nodes.ExecutionLocation.UNKNOWN);
-    Node bOut1PCollection = InstructionOutputNode.create(new InstructionOutput().setName("B.out1"));
-    Node bOut2PCollection = InstructionOutputNode.create(new InstructionOutput().setName("B.out1"));
+    Node bOut1PCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("B.out1"), PCOLLECTION_ID);
+    Node bOut2PCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("B.out1"), PCOLLECTION_ID);
     Edge bOut1 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out1"));
     Edge bOut2 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out2"));
     Edge bOut1PCollectionEdge = DefaultEdge.create();
@@ -129,22 +140,26 @@ public class RemoveFlattenInstructionsFunctionTest {
             new ParallelInstruction().setName("Flatten").setFlatten(new FlattenInstruction()),
             Nodes.ExecutionLocation.UNKNOWN);
     Node flattenPCollection =
-        InstructionOutputNode.create(new InstructionOutput().setName("Flatten.out"));
+        InstructionOutputNode.create(
+            new InstructionOutput().setName("Flatten.out"), PCOLLECTION_ID);
     Node c =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("C"), Nodes.ExecutionLocation.UNKNOWN);
     Edge cOutput = DefaultEdge.create();
-    Node cPCollection = InstructionOutputNode.create(new InstructionOutput().setName("C.out"));
+    Node cPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("C.out"), PCOLLECTION_ID);
     Node d =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("D"), Nodes.ExecutionLocation.UNKNOWN);
     Edge dOutput = DefaultEdge.create();
-    Node dPCollection = InstructionOutputNode.create(new InstructionOutput().setName("D.out"));
+    Node dPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("D.out"), PCOLLECTION_ID);
     Node e =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("E"), Nodes.ExecutionLocation.UNKNOWN);
     Edge eOutput = DefaultEdge.create();
-    Node ePCollection = InstructionOutputNode.create(new InstructionOutput().setName("E.out"));
+    Node ePCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("E.out"), PCOLLECTION_ID);
 
     //  /-out1-> C
     // A -out2-\
@@ -196,13 +211,16 @@ public class RemoveFlattenInstructionsFunctionTest {
     Node a =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("A"), Nodes.ExecutionLocation.UNKNOWN);
-    Node aPCollection = InstructionOutputNode.create(new InstructionOutput().setName("A.out"));
+    Node aPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("A.out"), PCOLLECTION_ID);
     Edge aOutput = DefaultEdge.create();
     Node b =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("B"), Nodes.ExecutionLocation.UNKNOWN);
-    Node bOut1PCollection = InstructionOutputNode.create(new InstructionOutput().setName("B.out1"));
-    Node bOut2PCollection = InstructionOutputNode.create(new InstructionOutput().setName("B.out1"));
+    Node bOut1PCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("B.out1"), PCOLLECTION_ID);
+    Node bOut2PCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("B.out1"), PCOLLECTION_ID);
     Edge bOut1 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out1"));
     Edge bOut2 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out2"));
     Node flatten1 =
@@ -210,18 +228,21 @@ public class RemoveFlattenInstructionsFunctionTest {
             new ParallelInstruction().setName("Flatten1").setFlatten(new FlattenInstruction()),
             Nodes.ExecutionLocation.UNKNOWN);
     Node flatten1PCollection =
-        InstructionOutputNode.create(new InstructionOutput().setName("Flatten1.out"));
+        InstructionOutputNode.create(
+            new InstructionOutput().setName("Flatten1.out"), PCOLLECTION_ID);
     Node flatten2 =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("Flatten2").setFlatten(new FlattenInstruction()),
             Nodes.ExecutionLocation.UNKNOWN);
     Node flatten2PCollection =
-        InstructionOutputNode.create(new InstructionOutput().setName("Flatten2.out"));
+        InstructionOutputNode.create(
+            new InstructionOutput().setName("Flatten2.out"), PCOLLECTION_ID);
     Node c =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("C"), Nodes.ExecutionLocation.UNKNOWN);
     Edge cOutput = DefaultEdge.create();
-    Node cPCollection = InstructionOutputNode.create(new InstructionOutput().setName("C.out"));
+    Node cPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("C.out"), PCOLLECTION_ID);
 
     // A ------\
     //          Flatten1 --\
@@ -262,29 +283,34 @@ public class RemoveFlattenInstructionsFunctionTest {
     Node a =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("A"), Nodes.ExecutionLocation.UNKNOWN);
-    Node aPCollection = InstructionOutputNode.create(new InstructionOutput().setName("A.out"));
+    Node aPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("A.out"), PCOLLECTION_ID);
     Edge aOutput = DefaultEdge.create();
     Node b =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("B"), Nodes.ExecutionLocation.UNKNOWN);
     Edge bOutput = DefaultEdge.create();
-    Node bPCollection = InstructionOutputNode.create(new InstructionOutput().setName("B.out"));
+    Node bPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("B.out"), PCOLLECTION_ID);
     Node flatten =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("Flatten").setFlatten(new FlattenInstruction()),
             Nodes.ExecutionLocation.UNKNOWN);
     Node flattenPCollection =
-        InstructionOutputNode.create(new InstructionOutput().setName("Flatten.out"));
+        InstructionOutputNode.create(
+            new InstructionOutput().setName("Flatten.out"), PCOLLECTION_ID);
     Node c =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("C"), Nodes.ExecutionLocation.UNKNOWN);
     Edge cOutput = DefaultEdge.create();
-    Node cPCollection = InstructionOutputNode.create(new InstructionOutput().setName("C.out"));
+    Node cPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("C.out"), PCOLLECTION_ID);
     Node d =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("D"), Nodes.ExecutionLocation.UNKNOWN);
     Edge dOutput = DefaultEdge.create();
-    Node dPCollection = InstructionOutputNode.create(new InstructionOutput().setName("D.out"));
+    Node dPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("D.out"), PCOLLECTION_ID);
 
     // A --\
     //      -> Flatten --> C
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/ReplacePgbkWithPrecombineFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/ReplacePgbkWithPrecombineFunctionTest.java
index 801f4af..6f0b826 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/ReplacePgbkWithPrecombineFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/ReplacePgbkWithPrecombineFunctionTest.java
@@ -148,6 +148,6 @@ public final class ReplacePgbkWithPrecombineFunctionTest {
 
   /** Creates an {@link InstructionOutputNode} to act as a PCollection. */
   private static InstructionOutputNode createInstructionOutputNode(String name) {
-    return InstructionOutputNode.create(new InstructionOutput().setName(name));
+    return InstructionOutputNode.create(new InstructionOutput().setName(name), "fakeId");
   }
 }


Mime
View raw message