beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamesmal...@apache.org
Subject [05/50] [abbrv] incubator-beam git commit: Expose base output file name on FileBasedSink
Date Fri, 26 Feb 2016 22:54:42 GMT
Expose base output file name on FileBasedSink

----Release Notes----
Add the ability to get the base output filename to FileBasedSinks.

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115265994


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c857afaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c857afaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c857afaf

Branch: refs/heads/master
Commit: c857afaf1911175a6d907f53167e39e9b639c665
Parents: 209364e
Author: lcwik <lcwik@google.com>
Authored: Mon Feb 22 13:49:38 2016 -0800
Committer: Davor Bonaci <davorbonaci@users.noreply.github.com>
Committed: Thu Feb 25 23:58:25 2016 -0800

----------------------------------------------------------------------
 .../com/google/cloud/dataflow/sdk/io/FileBasedSink.java  | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c857afaf/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java
index f14f4bf..7c30167 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java
@@ -121,6 +121,13 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   }
 
   /**
+   * Returns the base output filename for this file based sink.
+   */
+  public String getBaseOutputFilename() {
+    return baseOutputFilename;
+  }
+
+  /**
    * Perform pipeline-construction-time validation. The default implementation is a no-op.
    * Subclasses should override to ensure the sink is valid and can be written to. It is
recommended
    * to use {@link Preconditions} in the implementation of this method.
@@ -806,6 +813,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   }
 
   static class ReshardForWrite<T> extends PTransform<PCollection<T>, PCollection<T>>
{
+    @Override
     public PCollection<T> apply(PCollection<T> input) {
       return input
           // TODO: This would need to be adapted to write per-window shards.
@@ -815,10 +823,12 @@ public abstract class FileBasedSink<T> extends Sink<T> {
           .apply("RandomKey", ParDo.of(
               new DoFn<T, KV<Long, T>>() {
                 transient long counter, step;
+                @Override
                 public void startBundle(Context c) {
                   counter = (long) (Math.random() * Long.MAX_VALUE);
                   step = 1 + 2 * (long) (Math.random() * Long.MAX_VALUE);
                 }
+                @Override
                 public void processElement(ProcessContext c) {
                   counter += step;
                   c.output(KV.of(counter, c.element()));
@@ -827,6 +837,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
           .apply(GroupByKey.<Long, T>create())
           .apply("Ungroup", ParDo.of(
               new DoFn<KV<Long, Iterable<T>>, T>() {
+                @Override
                 public void processElement(ProcessContext c) {
                   for (T item : c.element().getValue()) {
                     c.output(item);


Mime
View raw message