beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [07/13] incubator-beam git commit: Port Write to new DoFn
Date Mon, 08 Aug 2016 20:40:42 GMT
Port Write to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/86291de3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/86291de3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/86291de3

Branch: refs/heads/master
Commit: 86291de39772765f4d6d404ac8a8430d8ad8a15f
Parents: 2c6aaf7
Author: Kenneth Knowles <klk@google.com>
Authored: Fri Aug 5 11:49:37 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/io/Write.java | 26 ++++++++++----------
 .../java/org/apache/beam/sdk/io/WriteTest.java  | 22 ++++++++++-------
 2 files changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/86291de3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 3e997b0..a846b7c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -27,8 +27,8 @@ import org.apache.beam.sdk.io.Sink.WriteOperation;
 import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -156,7 +156,7 @@ public class Write {
      * Writes all the elements in a bundle using a {@link Writer} produced by the
      * {@link WriteOperation} associated with the {@link Sink}.
      */
-    private class WriteBundles<WriteT> extends OldDoFn<T, WriteT> {
+    private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
       // Writer that will write the records in this bundle. Lazily
       // initialized in processElement.
       private Writer<T, WriteT> writer = null;
@@ -166,7 +166,7 @@ public class Write {
         this.writeOperationView = writeOperationView;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         // Lazily initialize the Writer
         if (writer == null) {
@@ -182,7 +182,7 @@ public class Write {
           // Discard write result and close the write.
           try {
             writer.close();
-            // The writer does not need to be reset, as this OldDoFn cannot be reused.
+            // The writer does not need to be reset, as this DoFn cannot be reused.
           } catch (Exception closeException) {
             if (closeException instanceof InterruptedException) {
               // Do not silently ignore interrupted state.
@@ -195,7 +195,7 @@ public class Write {
         }
       }
 
-      @Override
+      @FinishBundle
       public void finishBundle(Context c) throws Exception {
         if (writer != null) {
           WriteT result = writer.close();
@@ -217,14 +217,14 @@ public class Write {
      *
      * @see WriteBundles
      */
-    private class WriteShardedBundles<WriteT> extends OldDoFn<KV<Integer, Iterable<T>>,
WriteT> {
+    private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>,
WriteT> {
       private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
 
       WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView)
{
         this.writeOperationView = writeOperationView;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         // In a sharded write, single input element represents one shard. We can open and
close
         // the writer in each call to processElement.
@@ -296,8 +296,8 @@ public class Write {
      * <p>This singleton collection containing the WriteOperation is then used as a
side input to a
      * ParDo over the PCollection of elements to write. In this bundle-writing phase,
      * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-     * {@link Writer#open} and {@link Writer#close} are called in {@link OldDoFn#startBundle}
and
-     * {@link OldDoFn#finishBundle}, respectively, and {@link Writer#write} method is called
for
+     * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle}
and
+     * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called
for
      * every element in the bundle. The output of this ParDo is a PCollection of
      * <i>writer result</i> objects (see {@link Sink} for a description of writer
results)-one for
      * each bundle.
@@ -334,8 +334,8 @@ public class Write {
       // Initialize the resource in a do-once ParDo on the WriteOperation.
       operationCollection = operationCollection
           .apply("Initialize", ParDo.of(
-              new OldDoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>()
{
-            @Override
+              new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>()
{
+            @ProcessElement
             public void processElement(ProcessContext c) throws Exception {
               WriteOperation<T, WriteT> writeOperation = c.element();
               LOG.info("Initializing write operation {}", writeOperation);
@@ -388,8 +388,8 @@ public class Write {
       // ParDo. There is a dependency between this ParDo and the parallel write (the writer
results
       // collection as a side input), so it will happen after the parallel write.
       operationCollection
-          .apply("Finalize", ParDo.of(new OldDoFn<WriteOperation<T, WriteT>, Integer>()
{
-            @Override
+          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>()
{
+            @ProcessElement
             public void processElement(ProcessContext c) throws Exception {
               WriteOperation<T, WriteT> writeOperation = c.element();
               LOG.info("Finalizing write operation {}.", writeOperation);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/86291de3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 4b6e749..705b77c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+import static org.apache.beam.sdk.values.KV.of;
 
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -29,6 +30,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import static java.util.concurrent.ThreadLocalRandom.current;
+
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
@@ -41,9 +44,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOption
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -73,7 +76,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -102,16 +104,18 @@ public class WriteTest {
       this.window = window;
     }
 
-    private static class AddArbitraryKey<T> extends OldDoFn<T, KV<Integer, T>>
{
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-        c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
+    private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>>
{
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        c.output(of(current().nextInt(), c.element()));
       }
     }
 
-    private static class RemoveArbitraryKey<T> extends OldDoFn<KV<Integer, Iterable<T>>,
T> {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
+    private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>,
T> {
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
         for (T s : c.element().getValue()) {
           c.output(s);
         }


Mime
View raw message