beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [1/3] beam git commit: Split StartBundleContext and FinishBundleContext
Date Thu, 04 May 2017 22:55:28 GMT
Repository: beam
Updated Branches:
  refs/heads/master 8af5c28d4 -> defb55405


http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index c16eea2..fe96e87 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -71,6 +71,8 @@ import org.mockito.MockitoAnnotations;
 public class DoFnInvokersTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
 
+  @Mock private DoFn<String, String>.StartBundleContext mockStartBundleContext;
+  @Mock private DoFn<String, String>.FinishBundleContext mockFinishBundleContext;
   @Mock private DoFn<String, String>.ProcessContext mockProcessContext;
   @Mock private IntervalWindow mockWindow;
   @Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider;
@@ -79,6 +81,10 @@ public class DoFnInvokersTest {
   public void setUp() {
     MockitoAnnotations.initMocks(this);
     when(mockArgumentProvider.window()).thenReturn(mockWindow);
+    when(mockArgumentProvider.startBundleContext(Matchers.<DoFn>any()))
+        .thenReturn(mockStartBundleContext);
+    when(mockArgumentProvider.finishBundleContext(Matchers.<DoFn>any()))
+        .thenReturn(mockFinishBundleContext);
     when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
   }
 
@@ -233,10 +239,10 @@ public class DoFnInvokersTest {
       public void processElement(ProcessContext c) {}
 
       @StartBundle
-      public void startBundle(Context c) {}
+      public void startBundle(StartBundleContext c) {}
 
       @FinishBundle
-      public void finishBundle(Context c) {}
+      public void finishBundle(FinishBundleContext c) {}
 
       @Setup
       public void before() {}
@@ -247,12 +253,12 @@ public class DoFnInvokersTest {
     MockFn fn = mock(MockFn.class);
     DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
     invoker.invokeSetup();
-    invoker.invokeStartBundle(mockProcessContext);
-    invoker.invokeFinishBundle(mockProcessContext);
+    invoker.invokeStartBundle(mockStartBundleContext);
+    invoker.invokeFinishBundle(mockFinishBundleContext);
     invoker.invokeTeardown();
     verify(fn).before();
-    verify(fn).startBundle(mockProcessContext);
-    verify(fn).finishBundle(mockProcessContext);
+    verify(fn).startBundle(mockStartBundleContext);
+    verify(fn).finishBundle(mockFinishBundleContext);
     verify(fn).after();
   }
 
@@ -601,7 +607,7 @@ public class DoFnInvokersTest {
         DoFnInvokers.invokerFor(
             new DoFn<Integer, Integer>() {
               @StartBundle
-              public void startBundle(@SuppressWarnings("unused") Context c) {
+              public void startBundle(@SuppressWarnings("unused") StartBundleContext c) {
                 throw new IllegalArgumentException("bogus");
               }
 
@@ -619,7 +625,7 @@ public class DoFnInvokersTest {
         DoFnInvokers.invokerFor(
             new DoFn<Integer, Integer>() {
               @FinishBundle
-              public void finishBundle(@SuppressWarnings("unused") Context c) {
+              public void finishBundle(@SuppressWarnings("unused") FinishBundleContext c)
{
                 throw new IllegalArgumentException("bogus");
               }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index d6cc4f6..f099d5d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -76,13 +76,14 @@ public class DoFnSignaturesTest {
   @Test
   public void testBadExtraContext() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Must take a single argument of type DoFn<Integer, String>.Context");
+    thrown.expectMessage(
+        "Must take a single argument of type DoFn<Integer, String>.StartBundleContext");
 
-    DoFnSignatures.analyzeBundleMethod(
+    DoFnSignatures.analyzeStartBundleMethod(
         errors(),
         TypeDescriptor.of(FakeDoFn.class),
         new DoFnSignaturesTestUtils.AnonymousMethod() {
-          void method(DoFn<Integer, String>.Context c, int n) {}
+          void method(DoFn<Integer, String>.StartBundleContext c, int n) {}
         }.getMethod(),
         TypeDescriptor.of(Integer.class),
         TypeDescriptor.of(String.class));
@@ -112,8 +113,8 @@ public class DoFnSignaturesTest {
   public void testMultipleFinishBundleMethods() throws Exception {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Found multiple methods annotated with @FinishBundle");
-    thrown.expectMessage("bar(Context)");
-    thrown.expectMessage("baz(Context)");
+    thrown.expectMessage("bar(FinishBundleContext)");
+    thrown.expectMessage("baz(FinishBundleContext)");
     thrown.expectMessage(getClass().getName() + "$");
     DoFnSignatures.getSignature(
         new DoFn<String, String>() {
@@ -121,10 +122,10 @@ public class DoFnSignaturesTest {
           public void foo(ProcessContext context) {}
 
           @FinishBundle
-          public void bar(Context context) {}
+          public void bar(FinishBundleContext context) {}
 
           @FinishBundle
-          public void baz(Context context) {}
+          public void baz(FinishBundleContext context) {}
         }.getClass());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 1cdd087..6d5e230 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -71,6 +71,7 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -304,20 +305,21 @@ public class ProcessBundleHandlerTest {
     private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
     private static final TupleTag<String> additionalOutput = new TupleTag<>("output");
 
-    @StartBundle
-    public void startBundle(Context context) {
-      context.output("StartBundle");
-    }
+    private BoundedWindow window;
 
     @ProcessElement
-    public void processElement(ProcessContext context) {
+    public void processElement(ProcessContext context, BoundedWindow window) {
       context.output("MainOutput" + context.element());
       context.output(additionalOutput, "AdditionalOutput" + context.element());
+      this.window = window;
     }
 
     @FinishBundle
-    public void finishBundle(Context context) {
-      context.output("FinishBundle");
+    public void finishBundle(FinishBundleContext context) {
+      if (window != null) {
+        context.output("FinishBundle", window.maxTimestamp(), window);
+        window = null;
+      }
     }
   }
 
@@ -411,7 +413,6 @@ public class ProcessBundleHandlerTest {
         finishFunctions::add);
 
     Iterables.getOnlyElement(startFunctions).run();
-    assertThat(mainOutputValues, contains(valueInGlobalWindow("StartBundle")));
     mainOutputValues.clear();
 
     assertEquals(newConsumers.keySet(),

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 0a3b900..f6ceef2 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.MalformedURLException;
@@ -40,7 +39,6 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -757,7 +755,7 @@ public class ElasticsearchIO {
       }
 
       @StartBundle
-      public void startBundle(Context context) throws Exception {
+      public void startBundle(StartBundleContext context) throws Exception {
         batch = new ArrayList<>();
         currentBatchSizeBytes = 0;
       }
@@ -769,12 +767,16 @@ public class ElasticsearchIO {
         currentBatchSizeBytes += document.getBytes().length;
         if (batch.size() >= spec.getMaxBatchSize()
             || currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) {
-          finishBundle(context);
+          flushBatch();
         }
       }
 
       @FinishBundle
-      public void finishBundle(Context context) throws Exception {
+      public void finishBundle(FinishBundleContext context) throws Exception {
+        flushBatch();
+      }
+
+      private void flushBatch() throws IOException {
         if (batch.isEmpty()) {
           return;
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index fd5f396..f267976 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -56,7 +56,7 @@ class StreamingWriteFn
 
   /** Prepares a target BigQuery table. */
   @StartBundle
-  public void startBundle(Context context) {
+  public void startBundle() {
     tableRows = new HashMap<>();
     uniqueIdsForTableRows = new HashMap<>();
   }
@@ -75,7 +75,7 @@ class StreamingWriteFn
 
   /** Writes the accumulated rows into BigQuery with streaming API. */
   @FinishBundle
-  public void finishBundle(Context context) throws Exception {
+  public void finishBundle(FinishBundleContext context) throws Exception {
     BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
     for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) {
       TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
index 284691e..cd88222 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
@@ -40,7 +40,7 @@ class TagWithUniqueIds
   private transient long sequenceNo = 0L;
 
   @StartBundle
-  public void startBundle(Context context) {
+  public void startBundle() {
     randomUUID = UUID.randomUUID().toString();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index e90b974..70aa135 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,8 +49,10 @@ class WriteBundlesToFiles<DestinationT>
 
   // Map from tablespec to a writer for that table.
   private transient Map<DestinationT, TableRowWriter> writers;
+  private transient Map<DestinationT, BoundedWindow> writerWindows;
   private final String stepUuid;
 
+
   /**
    * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output
file,
    * and encapsulates the table it is destined to as well as the file byte size.
@@ -110,14 +113,15 @@ class WriteBundlesToFiles<DestinationT>
   }
 
   @StartBundle
-  public void startBundle(Context c) {
+  public void startBundle() {
     // This must be done each bundle, as by default the {@link DoFn} might be reused between
     // bundles.
     this.writers = Maps.newHashMap();
+    this.writerWindows = Maps.newHashMap();
   }
 
   @ProcessElement
-  public void processElement(ProcessContext c) throws Exception {
+  public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
     String tempFilePrefix = resolveTempLocation(
         c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid);
     TableRowWriter writer = writers.get(c.element().getKey());
@@ -125,6 +129,7 @@ class WriteBundlesToFiles<DestinationT>
       writer = new TableRowWriter(tempFilePrefix);
       writer.open(UUID.randomUUID().toString());
       writers.put(c.element().getKey(), writer);
+      writerWindows.put(c.element().getKey(), window);
       LOG.debug("Done opening writer {}", writer);
     }
     try {
@@ -143,11 +148,15 @@ class WriteBundlesToFiles<DestinationT>
   }
 
   @FinishBundle
-  public void finishBundle(Context c) throws Exception {
+  public void finishBundle(FinishBundleContext c) throws Exception {
     for (Map.Entry<DestinationT, TableRowWriter> entry : writers.entrySet()) {
       TableRowWriter.Result result = entry.getValue().close();
-      c.output(new Result<>(result.resourceId.toString(), result.byteSize, entry.getKey()));
+      c.output(
+          new Result<>(result.resourceId.toString(), result.byteSize, entry.getKey()),
+          writerWindows.get(entry.getKey()).maxTimestamp(),
+          writerWindows.get(entry.getKey()));
     }
     writers.clear();
+    writerWindows.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 69fac68..0e97c12 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -572,7 +572,7 @@ public class BigtableIO {
       }
 
       @StartBundle
-      public void startBundle(Context c) throws IOException {
+      public void startBundle(StartBundleContext c) throws IOException {
         if (bigtableWriter == null) {
           bigtableWriter = bigtableServiceFactory.apply(
               c.getPipelineOptions()).openForWriting(tableId);
@@ -589,7 +589,7 @@ public class BigtableIO {
       }
 
       @FinishBundle
-      public void finishBundle(Context c) throws Exception {
+      public void finishBundle() throws Exception {
         bigtableWriter.flush();
         checkForFailures();
         LOG.info("Wrote {} records", recordsWritten);

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index f619429..fd4fccf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -704,7 +704,7 @@ public class DatastoreV1 {
       }
 
       @StartBundle
-      public void startBundle(Context c) throws Exception {
+      public void startBundle(StartBundleContext c) throws Exception {
         datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), v1Options.getProjectId());
       }
 
@@ -748,7 +748,7 @@ public class DatastoreV1 {
       }
 
       @StartBundle
-      public void startBundle(Context c) throws Exception {
+      public void startBundle(StartBundleContext c) throws Exception {
         datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId(),
             options.getLocalhost());
         querySplitter = datastoreFactory.getQuerySplitter();
@@ -821,7 +821,7 @@ public class DatastoreV1 {
       }
 
       @StartBundle
-      public void startBundle(Context c) throws Exception {
+      public void startBundle(StartBundleContext c) throws Exception {
         datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId(),
             options.getLocalhost());
       }
@@ -1145,7 +1145,7 @@ public class DatastoreV1 {
     }
 
     @StartBundle
-    public void startBundle(Context c) {
+    public void startBundle(StartBundleContext c) {
       datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId.get(),
localhost);
     }
 
@@ -1158,7 +1158,7 @@ public class DatastoreV1 {
     }
 
     @FinishBundle
-    public void finishBundle(Context c) throws Exception {
+    public void finishBundle() throws Exception {
       if (!mutations.isEmpty()) {
         flushBatch();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index e023ad0..fa2d20f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -888,7 +888,7 @@ public class PubsubIO {
       private transient PubsubClient pubsubClient;
 
       @StartBundle
-      public void startBundle(Context c) throws IOException {
+      public void startBundle(StartBundleContext c) throws IOException {
         this.output = new ArrayList<>();
         // NOTE: idAttribute is ignored.
         this.pubsubClient =
@@ -911,7 +911,7 @@ public class PubsubIO {
       }
 
       @FinishBundle
-      public void finishBundle(Context c) throws IOException {
+      public void finishBundle() throws IOException {
         if (!output.isEmpty()) {
           publish();
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 9d97e91..031d9a0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -255,7 +255,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
     }
 
     @StartBundle
-    public void startBundle(Context c) throws Exception {
+    public void startBundle(StartBundleContext c) throws Exception {
       checkState(pubsubClient == null, "startBundle invoked without prior finishBundle");
       pubsubClient = pubsubFactory.newClient(timestampAttribute, idAttribute,
                                              c.getPipelineOptions().as(PubsubOptions.class));
@@ -287,7 +287,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
     }
 
     @FinishBundle
-    public void finishBundle(Context c) throws Exception {
+    public void finishBundle() throws Exception {
       pubsubClient.close();
       pubsubClient = null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
index 86a9246..ef6556e 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
@@ -223,6 +223,7 @@ public class Write<T> extends PTransform<PCollection<T>,
PDone> {
     // Writer that will write the records in this bundle. Lazily
     // initialized in processElement.
     private Writer<T, WriteT> writer = null;
+    private BoundedWindow window;
     private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
 
     WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView)
{
@@ -243,6 +244,7 @@ public class Write<T> extends PTransform<PCollection<T>,
PDone> {
         } else {
           writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
         }
+        this.window = window;
         LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
       }
       try {
@@ -265,12 +267,13 @@ public class Write<T> extends PTransform<PCollection<T>,
PDone> {
     }
 
     @FinishBundle
-    public void finishBundle(Context c) throws Exception {
+    public void finishBundle(FinishBundleContext c) throws Exception {
       if (writer != null) {
         WriteT result = writer.close();
-        c.output(result);
+        c.output(result, window.maxTimestamp(), window);
         // Reset state in case of reuse.
         writer = null;
+        window = null;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index eee8927..3c42da9 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -634,11 +634,6 @@ public class HBaseIO {
                 recordsWritten = 0;
             }
 
-            @StartBundle
-            public void startBundle(Context c) throws Exception {
-
-            }
-
             @ProcessElement
             public void processElement(ProcessContext ctx) throws Exception {
                 KV<byte[], Iterable<Mutation>> record = ctx.element();
@@ -651,7 +646,7 @@ public class HBaseIO {
             }
 
             @FinishBundle
-            public void finishBundle(Context c) throws Exception {
+            public void finishBundle() throws Exception {
                 mutator.flush();
             }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 2d48236..2eb53dd 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Random;
 
 import javax.annotation.Nullable;
@@ -480,7 +481,7 @@ public class JdbcIO {
       }
 
       @StartBundle
-      public void startBundle(Context context) {
+      public void startBundle() {
         batchCount = 0;
       }
 
@@ -495,12 +496,16 @@ public class JdbcIO {
         batchCount++;
 
         if (batchCount >= DEFAULT_BATCH_SIZE) {
-          finishBundle(context);
+          executeBatch();
         }
       }
 
       @FinishBundle
-      public void finishBundle(Context context) throws Exception {
+      public void finishBundle() throws Exception {
+        executeBatch();
+      }
+
+      private void executeBatch() throws SQLException {
         if (batchCount > 0) {
           preparedStatement.executeBatch();
           connection.commit();

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 813e051..4493e56 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -674,7 +674,7 @@ public class JmsIO {
       }
 
       @StartBundle
-      public void startBundle(Context c) throws Exception {
+      public void startBundle() throws Exception {
         if (producer == null) {
           if (spec.getUsername() != null) {
             this.connection =
@@ -703,13 +703,13 @@ public class JmsIO {
           TextMessage message = session.createTextMessage(value);
           producer.send(message);
         } catch (Exception t) {
-          finishBundle(null);
+          finishBundle();
           throw t;
         }
       }
 
       @FinishBundle
-      public void finishBundle(Context c) throws Exception {
+      public void finishBundle() throws Exception {
         producer.close();
         producer = null;
         session.close();

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 8ab33d1..f4de76a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1632,7 +1632,7 @@ public class KafkaIO {
     }
 
     @FinishBundle
-    public void finishBundle(Context c) throws IOException {
+    public void finishBundle() throws IOException {
       producer.flush();
       checkForFailures();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index 940d875..0868ed4 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -641,7 +641,7 @@ public class MongoDbGridFSIO {
     }
 
     @StartBundle
-    public void startBundle(Context context) {
+    public void startBundle() {
       gridFsFile = gridfs.createFile(spec.filename());
       if (spec.chunkSize() != null) {
         gridFsFile.setChunkSize(spec.chunkSize());
@@ -656,7 +656,7 @@ public class MongoDbGridFSIO {
     }
 
     @FinishBundle
-    public void finishBundle(Context context) throws Exception {
+    public void finishBundle() throws Exception {
       if (gridFsFile != null) {
         outputStream.flush();
         outputStream.close();

http://git-wip-us.apache.org/repos/asf/beam/blob/d59d9b74/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index f8edbf1..7236a50 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -466,7 +466,7 @@ public class MongoDbIO {
       }
 
       @StartBundle
-      public void startBundle(Context ctx) throws Exception {
+      public void startBundle() throws Exception {
         batch = new ArrayList<>();
       }
 
@@ -476,12 +476,16 @@ public class MongoDbIO {
         // before inserting (will assign an id).
         batch.add(new Document(ctx.element()));
         if (batch.size() >= spec.batchSize()) {
-          finishBundle(ctx);
+          flush();
         }
       }
 
       @FinishBundle
-      public void finishBundle(Context ctx) throws Exception {
+      public void finishBundle() throws Exception {
+        flush();
+      }
+
+      private void flush() {
         MongoDatabase mongoDatabase = client.getDatabase(spec.database());
         MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(spec.collection());
 


Mime
View raw message