beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From p..@apache.org
Subject [11/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.
Date Sun, 20 Aug 2017 15:03:11 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
index c487578..77e4381 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
@@ -18,94 +18,101 @@
 package org.apache.beam.runners.jstorm.translation.translator;
 
 import avro.shaded.com.google.common.collect.Maps;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import org.apache.beam.runners.jstorm.translation.TranslationContext;
 import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
 import org.apache.beam.runners.jstorm.translation.runtime.MultiOutputDoFnExecutor;
 import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.*;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.PValueBase;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}.
  */
 public class ParDoBoundMultiTranslator<InputT, OutputT>
-        extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
-
-    @Override
-    public void translateNode(ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
-        final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag();
-        PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
+    extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
 
-        Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
-        Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
-        for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
-            Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
-            localToExternalTupleTagMap.put(entry.getKey(), itr.next());
-        }
+  @Override
+  public void translateNode(
+      ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
+    final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag();
+    PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
 
-        TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
-        List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
-        sideOutputTags.remove(mainOutputTag);
+    Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
+    Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
+    for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
+      Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
+      localToExternalTupleTagMap.put(entry.getKey(), itr.next());
+    }
 
-        Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
-        for (PCollectionView pCollectionView : transform.getSideInputs()) {
-            allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-        }
-        String description = describeTransform(
-                transform,
-                allInputs,
-                allOutputs);
+    TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
+    List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
+    sideOutputTags.remove(mainOutputTag);
 
-        ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
-        for (PCollectionView pCollectionView : transform.getSideInputs()) {
-            sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-        }
+    Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
+    for (PCollectionView pCollectionView : transform.getSideInputs()) {
+      allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+    }
+    String description = describeTransform(
+        transform,
+        allInputs,
+        allOutputs);
 
-        DoFnExecutor executor;
-        DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
-        if (signature.stateDeclarations().size() > 0
-                || signature.timerDeclarations().size() > 0) {
-            executor = new MultiStatefulDoFnExecutor<>(
-                    userGraphContext.getStepName(),
-                    description,
-                    userGraphContext.getOptions(),
-                    (DoFn<KV, OutputT>) transform.getFn(),
-                    (Coder) WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-                    input.getWindowingStrategy(),
-                    (TupleTag<KV>) inputTag,
-                    transform.getSideInputs(),
-                    sideInputTagToView.build(),
-                    mainOutputTag,
-                    sideOutputTags,
-                    localToExternalTupleTagMap);
-        } else {
-            executor = new MultiOutputDoFnExecutor<>(
-                    userGraphContext.getStepName(),
-                    description,
-                    userGraphContext.getOptions(),
-                    transform.getFn(),
-                    WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-                    input.getWindowingStrategy(),
-                    inputTag,
-                    transform.getSideInputs(),
-                    sideInputTagToView.build(),
-                    mainOutputTag,
-                    sideOutputTags,
-                    localToExternalTupleTagMap);
-        }
+    ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
+    for (PCollectionView pCollectionView : transform.getSideInputs()) {
+      sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+    }
 
-        context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+    DoFnExecutor executor;
+    DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+    if (signature.stateDeclarations().size() > 0
+        || signature.timerDeclarations().size() > 0) {
+      executor = new MultiStatefulDoFnExecutor<>(
+          userGraphContext.getStepName(),
+          description,
+          userGraphContext.getOptions(),
+          (DoFn<KV, OutputT>) transform.getFn(),
+          (Coder) WindowedValue.getFullCoder(
+              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+          input.getWindowingStrategy(),
+          (TupleTag<KV>) inputTag,
+          transform.getSideInputs(),
+          sideInputTagToView.build(),
+          mainOutputTag,
+          sideOutputTags,
+          localToExternalTupleTagMap);
+    } else {
+      executor = new MultiOutputDoFnExecutor<>(
+          userGraphContext.getStepName(),
+          description,
+          userGraphContext.getOptions(),
+          transform.getFn(),
+          WindowedValue.getFullCoder(
+              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+          input.getWindowingStrategy(),
+          inputTag,
+          transform.getSideInputs(),
+          sideInputTagToView.build(),
+          mainOutputTag,
+          sideOutputTags,
+          localToExternalTupleTagMap);
     }
+
+    context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
index 3a952a9..7b998d9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
@@ -17,24 +17,25 @@
  */
 package org.apache.beam.runners.jstorm.translation.translator;
 
-import java.util.List;
-import java.util.Map;
-
 import avro.shaded.com.google.common.collect.Lists;
-import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
 import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
+import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.*;
-
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,65 +43,68 @@ import org.slf4j.LoggerFactory;
  * Translates a ParDo.Bound to a Storm {@link DoFnExecutor}.
  */
 public class ParDoBoundTranslator<InputT, OutputT>
-        extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
+    extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
 
-    @Override
-    public void translateNode(ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
-        final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        final TupleTag<?> inputTag = userGraphContext.getInputTag();
-        PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
+  private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
 
-        TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
-        List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
+  @Override
+  public void translateNode(
+      ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
+    final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    final TupleTag<?> inputTag = userGraphContext.getInputTag();
+    PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
 
-        Map<TupleTag<?>, PValue> allInputs = avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
-        for (PCollectionView pCollectionView : transform.getSideInputs()) {
-            allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-        }
-        String description = describeTransform(
-                transform,
-                allInputs,
-                userGraphContext.getOutputs());
+    TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
+    List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
 
-        ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
-        for (PCollectionView pCollectionView : transform.getSideInputs()) {
-            sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-        }
+    Map<TupleTag<?>, PValue> allInputs =
+        avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
+    for (PCollectionView pCollectionView : transform.getSideInputs()) {
+      allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+    }
+    String description = describeTransform(
+        transform,
+        allInputs,
+        userGraphContext.getOutputs());
 
-        DoFnExecutor executor;
-        DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
-        if (signature.stateDeclarations().size() > 0
-                || signature.timerDeclarations().size() > 0) {
-            executor = new StatefulDoFnExecutor<>(
-                    userGraphContext.getStepName(),
-                    description,
-                    userGraphContext.getOptions(),
-                    (DoFn<KV, OutputT>) transform.getFn(),
-                    (Coder) WindowedValue.getFullCoder(
-                            input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-                    input.getWindowingStrategy(),
-                    (TupleTag<KV>) inputTag,
-                    transform.getSideInputs(),
-                    sideInputTagToView.build(),
-                    mainOutputTag,
-                    sideOutputTags);
-        } else {
-            executor = new DoFnExecutor<>(
-                    userGraphContext.getStepName(),
-                    description,
-                    userGraphContext.getOptions(),
-                    transform.getFn(),
-                    WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-                    input.getWindowingStrategy(),
-                    (TupleTag<InputT>) inputTag,
-                    transform.getSideInputs(),
-                    sideInputTagToView.build(),
-                    mainOutputTag,
-                    sideOutputTags);
-        }
+    ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
+    for (PCollectionView pCollectionView : transform.getSideInputs()) {
+      sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+    }
 
-        context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+    DoFnExecutor executor;
+    DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+    if (signature.stateDeclarations().size() > 0
+        || signature.timerDeclarations().size() > 0) {
+      executor = new StatefulDoFnExecutor<>(
+          userGraphContext.getStepName(),
+          description,
+          userGraphContext.getOptions(),
+          (DoFn<KV, OutputT>) transform.getFn(),
+          (Coder) WindowedValue.getFullCoder(
+              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+          input.getWindowingStrategy(),
+          (TupleTag<KV>) inputTag,
+          transform.getSideInputs(),
+          sideInputTagToView.build(),
+          mainOutputTag,
+          sideOutputTags);
+    } else {
+      executor = new DoFnExecutor<>(
+          userGraphContext.getStepName(),
+          description,
+          userGraphContext.getOptions(),
+          transform.getFn(),
+          WindowedValue.getFullCoder(
+              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+          input.getWindowingStrategy(),
+          (TupleTag<InputT>) inputTag,
+          transform.getSideInputs(),
+          sideInputTagToView.build(),
+          mainOutputTag,
+          sideOutputTags);
     }
+
+    context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
index 1ef1ec3..c450a22 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
@@ -19,6 +19,6 @@ package org.apache.beam.runners.jstorm.translation.translator;
 
 import org.apache.beam.sdk.transforms.Reshuffle;
 
-public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K,V>> {
-    
+public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K, V>> {
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
index 5b5a8e2..a15a8ba 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
@@ -17,76 +17,79 @@
  */
 package org.apache.beam.runners.jstorm.translation.translator;
 
-import com.google.auto.value.AutoValue;
-
-import javax.annotation.Nullable;
-import java.util.List;
-
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.auto.value.AutoValue;
+import java.util.List;
+import javax.annotation.Nullable;
+
 /**
  * Class that defines the stream connection between upstream and downstream components.
  */
 @AutoValue
 public abstract class Stream {
 
-    public abstract Producer getProducer();
-    public abstract Consumer getConsumer();
+  public abstract Producer getProducer();
 
-    public static Stream of(Producer producer, Consumer consumer) {
-        return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream(
-            producer, consumer);
+  public abstract Consumer getConsumer();
+
+  public static Stream of(Producer producer, Consumer consumer) {
+    return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream(
+        producer, consumer);
+  }
+
+  @AutoValue
+  public abstract static class Producer {
+    public abstract String getComponentId();
+
+    public abstract String getStreamId();
+
+    public abstract String getStreamName();
+
+    public static Producer of(String componentId, String streamId, String streamName) {
+      return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Producer(
+          componentId, streamId, streamName);
     }
+  }
 
-    @AutoValue
-    public abstract static class Producer {
-        public abstract String getComponentId();
-        public abstract String getStreamId();
-        public abstract String getStreamName();
+  @AutoValue
+  public abstract static class Consumer {
+    public abstract String getComponentId();
 
-        public static Producer of(String componentId, String streamId, String streamName) {
-            return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Producer(
-                    componentId, streamId, streamName);
-        }
+    public abstract Grouping getGrouping();
+
+    public static Consumer of(String componentId, Grouping grouping) {
+      return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Consumer(
+          componentId, grouping);
     }
+  }
+
+  @AutoValue
+  public abstract static class Grouping {
+    public abstract Type getType();
 
-    @AutoValue
-    public abstract static class Consumer {
-        public abstract String getComponentId();
-        public abstract Grouping getGrouping();
+    @Nullable
+    public abstract List<String> getFields();
+
+    public static Grouping of(Type type) {
+      checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields.");
+      return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping(
+          type, null /* fields */);
+    }
 
-        public static Consumer of(String componentId, Grouping grouping) {
-            return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Consumer(
-                    componentId, grouping);
-        }
+    public static Grouping byFields(List<String> fields) {
+      checkNotNull(fields, "fields");
+      checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!");
+      return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping(
+          Type.FIELDS, fields);
     }
 
-    @AutoValue
-    public abstract static class Grouping {
-        public abstract Type getType();
-
-        @Nullable
-        public abstract List<String> getFields();
-
-        public static Grouping of(Type type) {
-            checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields.");
-            return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping(
-                    type, null /* fields */);
-        }
-
-        public static Grouping byFields(List<String> fields) {
-            checkNotNull(fields, "fields");
-            checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!");
-            return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping(
-                    Type.FIELDS, fields);
-        }
-
-        /**
-         * Types of stream groupings Storm allows
-         */
-        public enum Type {
-            ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE
-        }
+    /**
+     * Types of stream groupings Storm allows
+     */
+    public enum Type {
+      ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
index bebdf7b..487cac0 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
@@ -20,57 +20,57 @@ package org.apache.beam.runners.jstorm.translation.translator;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.collect.FluentIterable;
+import java.util.Map;
 import org.apache.beam.runners.jstorm.translation.TranslationContext;
 import org.apache.beam.sdk.transforms.PTransform;
-
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
-import java.util.Map;
-
 /**
  * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
  */
 public interface TransformTranslator<T extends PTransform<?, ?>> {
 
-    void translateNode(T transform, TranslationContext context);
+  void translateNode(T transform, TranslationContext context);
 
-    /**
-     * Returns true if this translator can translate the given transform.
-     */
-    boolean canTranslate(T transform, TranslationContext context);
+  /**
+   * Returns true if this translator can translate the given transform.
+   */
+  boolean canTranslate(T transform, TranslationContext context);
 
-    class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
-        @Override
-        public void translateNode(T1 transform, TranslationContext context) {
+  class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
+    @Override
+    public void translateNode(T1 transform, TranslationContext context) {
 
-        }
+    }
 
-        @Override
-        public boolean canTranslate(T1 transform, TranslationContext context) {
-            return true;
-        }
+    @Override
+    public boolean canTranslate(T1 transform, TranslationContext context) {
+      return true;
+    }
 
-        static String describeTransform(
-                PTransform<?, ?> transform,
-                Map<TupleTag<?>, PValue> inputs,
-                Map<TupleTag<?>, PValue> outputs) {
-            return String.format("%s --> %s --> %s",
-                    Joiner.on('+').join(FluentIterable.from(inputs.entrySet())
-                            .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
-                                @Override
-                                public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
-                                    return taggedPValue.getKey().getId();
-                                    // return taggedPValue.getValue().getName();
-                                }})),
-                    transform.getName(),
-                    Joiner.on('+').join(FluentIterable.from(outputs.entrySet())
-                            .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
-                                @Override
-                                public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
-                                    return taggedPvalue.getKey().getId();
-                                    //return taggedPValue.getValue().getName();
-                                }})));
-        }
+    static String describeTransform(
+        PTransform<?, ?> transform,
+        Map<TupleTag<?>, PValue> inputs,
+        Map<TupleTag<?>, PValue> outputs) {
+      return String.format("%s --> %s --> %s",
+          Joiner.on('+').join(FluentIterable.from(inputs.entrySet())
+              .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
+                @Override
+                public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
+                  return taggedPValue.getKey().getId();
+                  // return taggedPValue.getValue().getName();
+                }
+              })),
+          transform.getName(),
+          Joiner.on('+').join(FluentIterable.from(outputs.entrySet())
+              .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
+                @Override
+                public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
+                  return taggedPvalue.getKey().getId();
+                  //return taggedPValue.getValue().getName();
+                }
+              })));
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
index ac7d7bd..33ac024 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
@@ -17,30 +17,30 @@
  */
 package org.apache.beam.runners.jstorm.translation.translator;
 
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
-
 /**
  * Translates a Read.Unbounded into a Storm spout.
- * 
+ *
  * @param <T>
  */
 public class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> {
-    public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
-        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+  public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    String description =
+        describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
 
-        TupleTag<?> tag = userGraphContext.getOutputTag();
-        PValue output = userGraphContext.getOutput();
+    TupleTag<?> tag = userGraphContext.getOutputTag();
+    PValue output = userGraphContext.getOutput();
 
-        UnboundedSourceSpout spout = new UnboundedSourceSpout(
-                description,
-                transform.getSource(), userGraphContext.getOptions(), tag);
-        context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
-    }
+    UnboundedSourceSpout spout = new UnboundedSourceSpout(
+        description,
+        transform.getSource(), userGraphContext.getOptions(), tag);
+    context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
index 0ebf837..c55c8d6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
@@ -17,6 +17,10 @@
  */
 package org.apache.beam.runners.jstorm.translation.translator;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import org.apache.beam.runners.jstorm.translation.TranslationContext;
 import org.apache.beam.runners.jstorm.translation.runtime.ViewExecutor;
 import org.apache.beam.sdk.coders.Coder;
@@ -33,342 +37,342 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 /**
  * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner.
  */
-public class ViewTranslator extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> {
-    @Override
-    public void translateNode(CreateJStormPCollectionView<?, ?> transform, TranslationContext context) {
-        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-        ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag());
-        context.addTransformExecutor(viewExecutor);
+public class ViewTranslator
+    extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> {
+  @Override
+  public void translateNode(
+      CreateJStormPCollectionView<?, ?> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    String description = describeTransform(
+        transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+    ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag());
+    context.addTransformExecutor(viewExecutor);
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+   * for the Flink runner in streaming mode.
+   */
+  public static class ViewAsMap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+    @SuppressWarnings("unused") // used via reflection in JstormRunner#apply()
+    public ViewAsMap(View.AsMap<K, V> transform) {
     }
 
-    /**
-     * Specialized implementation for
-     * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
-     * for the Flink runner in streaming mode.
-     */
-    public static class ViewAsMap<K, V>
-            extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
-        @SuppressWarnings("unused") // used via reflection in JstormRunner#apply()
-        public ViewAsMap(View.AsMap<K, V> transform) {
-        }
-
-        @Override
-        public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
-            PCollectionView<Map<K, V>> view =
-                    PCollectionViews.mapView(
-                            input,
-                            input.getWindowingStrategy(),
-                            input.getCoder());
-
-            @SuppressWarnings({"rawtypes", "unchecked"})
-            KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-            try {
-                inputCoder.getKeyCoder().verifyDeterministic();
-            } catch (Coder.NonDeterministicException e) {
-                // TODO: log warning as other runners.
-            }
-
-            return input
-                    .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-                    .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view));
-        }
+    @Override
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, V>> view =
+          PCollectionViews.mapView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        // TODO: log warning as other runners.
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+          .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+    }
 
-        @Override
-        protected String getKindString() {
-            return "StreamingViewAsMap";
-        }
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMap";
     }
+  }
+
+  /**
+   * Specialized expansion for {@link
+   * View.AsMultimap View.AsMultimap} for the
+   * Flink runner in streaming mode.
+   */
+  public static class ViewAsMultimap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
 
     /**
-     * Specialized expansion for {@link
-     * View.AsMultimap View.AsMultimap} for the
-     * Flink runner in streaming mode.
+     * Builds an instance of this class from the overridden transform.
      */
-    public static class ViewAsMultimap<K, V>
-            extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
-        /**
-         * Builds an instance of this class from the overridden transform.
-         */
-        @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
-        public ViewAsMultimap(View.AsMultimap<K, V> transform) {
-        }
-
-        @Override
-        public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
-            PCollectionView<Map<K, Iterable<V>>> view =
-                    PCollectionViews.multimapView(
-                            input,
-                            input.getWindowingStrategy(),
-                            input.getCoder());
-
-            @SuppressWarnings({"rawtypes", "unchecked"})
-            KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-            try {
-                inputCoder.getKeyCoder().verifyDeterministic();
-            } catch (Coder.NonDeterministicException e) {
-                // TODO: log warning as other runners.
-            }
-
-            return input
-                    .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-                    .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
-        }
+    @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+    public ViewAsMultimap(View.AsMultimap<K, V> transform) {
+    }
 
-        @Override
-        protected String getKindString() {
-            return "StreamingViewAsMultimap";
-        }
+    @Override
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, Iterable<V>>> view =
+          PCollectionViews.multimapView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        // TODO: log warning as other runners.
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+          .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
     }
 
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMultimap";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link View.AsList View.AsList} for the
+   * JStorm runner in streaming mode.
+   */
+  public static class ViewAsList<T>
+      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
     /**
-     * Specialized implementation for
-     * {@link View.AsList View.AsList} for the
-     * JStorm runner in streaming mode.
+     * Builds an instance of this class from the overridden transform.
      */
-    public static class ViewAsList<T>
-            extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
-        /**
-         * Builds an instance of this class from the overridden transform.
-         */
-        @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
-        public ViewAsList(View.AsList<T> transform) {}
-
-        @Override
-        public PCollectionView<List<T>> expand(PCollection<T> input) {
-            PCollectionView<List<T>> view =
-                    PCollectionViews.listView(
-                            input,
-                            input.getWindowingStrategy(),
-                            input.getCoder());
-
-            return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-                    .apply(CreateJStormPCollectionView.<T, List<T>>of(view));
-        }
+    @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+    public ViewAsList(View.AsList<T> transform) {
+    }
 
-        @Override
-        protected String getKindString() {
-            return "StreamingViewAsList";
-        }
+    @Override
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
+      PCollectionView<List<T>> view =
+          PCollectionViews.listView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateJStormPCollectionView.<T, List<T>>of(view));
     }
 
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsList";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link View.AsIterable View.AsIterable} for the
+   * JStorm runner in streaming mode.
+   */
+  public static class ViewAsIterable<T>
+      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
     /**
-     * Specialized implementation for
-     * {@link View.AsIterable View.AsIterable} for the
-     * JStorm runner in streaming mode.
+     * Builds an instance of this class from the overridden transform.
      */
-    public static class ViewAsIterable<T>
-            extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-        /**
-         * Builds an instance of this class from the overridden transform.
-         */
-        @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
-        public ViewAsIterable(View.AsIterable<T> transform) { }
-
-        @Override
-        public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
-            PCollectionView<Iterable<T>> view =
-                    PCollectionViews.iterableView(
-                            input,
-                            input.getWindowingStrategy(),
-                            input.getCoder());
-
-            return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-                    .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view));
-        }
-
-        @Override
-        protected String getKindString() {
-            return "StreamingViewAsIterable";
-        }
+    @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+    public ViewAsIterable(View.AsIterable<T> transform) {
     }
 
-    /**
-     * Specialized expansion for
-     * {@link View.AsSingleton View.AsSingleton} for the
-     * JStorm runner in streaming mode.
-     */
-    public static class ViewAsSingleton<T>
-            extends PTransform<PCollection<T>, PCollectionView<T>> {
-        private View.AsSingleton<T> transform;
-
-        /**
-         * Builds an instance of this class from the overridden transform.
-         */
-        @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
-        public ViewAsSingleton(View.AsSingleton<T> transform) {
-            this.transform = transform;
-        }
+    @Override
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+      PCollectionView<Iterable<T>> view =
+          PCollectionViews.iterableView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view));
+    }
 
-        @Override
-        public PCollectionView<T> expand(PCollection<T> input) {
-            Combine.Globally<T, T> combine = Combine.globally(
-                    new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
-            if (!transform.hasDefaultValue()) {
-                combine = combine.withoutDefaults();
-            }
-            return input.apply(combine.asSingletonView());
-        }
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsIterable";
+    }
+  }
 
-        @Override
-        protected String getKindString() {
-            return "StreamingViewAsSingleton";
-        }
+  /**
+   * Specialized expansion for
+   * {@link View.AsSingleton View.AsSingleton} for the
+   * JStorm runner in streaming mode.
+   */
+  public static class ViewAsSingleton<T>
+      extends PTransform<PCollection<T>, PCollectionView<T>> {
+    private View.AsSingleton<T> transform;
 
-        private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
-            private boolean hasDefaultValue;
-            private T defaultValue;
-
-            SingletonCombine(boolean hasDefaultValue, T defaultValue) {
-                this.hasDefaultValue = hasDefaultValue;
-                this.defaultValue = defaultValue;
-            }
-
-            @Override
-            public T apply(T left, T right) {
-                throw new IllegalArgumentException("PCollection with more than one element "
-                        + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
-                        + "combine the PCollection into a single value");
-            }
-
-            @Override
-            public T identity() {
-                if (hasDefaultValue) {
-                    return defaultValue;
-                } else {
-                    throw new IllegalArgumentException(
-                            "Empty PCollection accessed as a singleton view. "
-                                    + "Consider setting withDefault to provide a default value");
-                }
-            }
-        }
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+    public ViewAsSingleton(View.AsSingleton<T> transform) {
+      this.transform = transform;
     }
 
-    public static class CombineGloballyAsSingletonView<InputT, OutputT>
-            extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
-        Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
-        /**
-         * Builds an instance of this class from the overridden transform.
-         */
-        @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-        public CombineGloballyAsSingletonView(
-                Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
-            this.transform = transform;
-        }
-
-        @Override
-        public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-            PCollection<OutputT> combined =
-                    input.apply(Combine.globally(transform.getCombineFn())
-                            .withoutDefaults()
-                            .withFanout(transform.getFanout()));
-
-            PCollectionView<OutputT> view = PCollectionViews.singletonView(
-                    combined,
-                    combined.getWindowingStrategy(),
-                    transform.getInsertDefault(),
-                    transform.getInsertDefault()
-                            ? transform.getCombineFn().defaultValue() : null,
-                    combined.getCoder());
-            return combined
-                    .apply(ParDo.of(new WrapAsList<OutputT>()))
-                    .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view));
-        }
+    @Override
+    public PCollectionView<T> expand(PCollection<T> input) {
+      Combine.Globally<T, T> combine = Combine.globally(
+          new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+      if (!transform.hasDefaultValue()) {
+        combine = combine.withoutDefaults();
+      }
+      return input.apply(combine.asSingletonView());
+    }
 
-        @Override
-        protected String getKindString() {
-            return "StreamingCombineGloballyAsSingletonView";
-        }
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsSingleton";
     }
 
-    private static class WrapAsList<T> extends DoFn<T, List<T>> {
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-            c.output(Collections.singletonList(c.element()));
+    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+      private boolean hasDefaultValue;
+      private T defaultValue;
+
+      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+        this.hasDefaultValue = hasDefaultValue;
+        this.defaultValue = defaultValue;
+      }
+
+      @Override
+      public T apply(T left, T right) {
+        throw new IllegalArgumentException("PCollection with more than one element "
+            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+            + "combine the PCollection into a single value");
+      }
+
+      @Override
+      public T identity() {
+        if (hasDefaultValue) {
+          return defaultValue;
+        } else {
+          throw new IllegalArgumentException(
+              "Empty PCollection accessed as a singleton view. "
+                  + "Consider setting withDefault to provide a default value");
         }
+      }
     }
+  }
+
+  public static class CombineGloballyAsSingletonView<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
 
     /**
-     * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
-     * They require the input {@link PCollection} fits in memory.
-     * For a large {@link PCollection} this is expected to crash!
-     *
-     * @param <T> the type of elements to concatenate.
+     * Builds an instance of this class from the overridden transform.
      */
-    private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
-        private static final long serialVersionUID = 1L;
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public CombineGloballyAsSingletonView(
+        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+      this.transform = transform;
+    }
 
-        @Override
-        public List<T> createAccumulator() {
-            return new ArrayList<>();
-        }
+    @Override
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+      PCollection<OutputT> combined =
+          input.apply(Combine.globally(transform.getCombineFn())
+              .withoutDefaults()
+              .withFanout(transform.getFanout()));
+
+      PCollectionView<OutputT> view = PCollectionViews.singletonView(
+          combined,
+          combined.getWindowingStrategy(),
+          transform.getInsertDefault(),
+          transform.getInsertDefault()
+              ? transform.getCombineFn().defaultValue() : null,
+          combined.getCoder());
+      return combined
+          .apply(ParDo.of(new WrapAsList<OutputT>()))
+          .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view));
+    }
 
-        @Override
-        public List<T> addInput(List<T> accumulator, T input) {
-            accumulator.add(input);
-            return accumulator;
-        }
+    @Override
+    protected String getKindString() {
+      return "StreamingCombineGloballyAsSingletonView";
+    }
+  }
 
-        @Override
-        public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
-            List<T> result = createAccumulator();
-            for (List<T> accumulator : accumulators) {
-                result.addAll(accumulator);
-            }
-            return result;
-        }
+  private static class WrapAsList<T> extends DoFn<T, List<T>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(Collections.singletonList(c.element()));
+    }
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+   * They require the input {@link PCollection} fits in memory.
+   * For a large {@link PCollection} this is expected to crash!
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+    private static final long serialVersionUID = 1L;
 
-        @Override
-        public List<T> extractOutput(List<T> accumulator) {
-            return accumulator;
-        }
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<>();
+    }
 
-        @Override
-        public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
-            return ListCoder.of(inputCoder);
-        }
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
 
-        @Override
-        public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
-            return ListCoder.of(inputCoder);
-        }
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
     }
 
-    /**
-     * Creates a primitive {@link PCollectionView}.
-     *
-     * <p>For internal use only by runner implementors.
-     *
-     * @param <ElemT> The type of the elements of the input PCollection
-     * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
-     */
-    public static class CreateJStormPCollectionView<ElemT, ViewT>
-            extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
-        private PCollectionView<ViewT> view;
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
 
-        private CreateJStormPCollectionView(PCollectionView<ViewT> view) {
-            this.view = view;
-        }
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
 
-        public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of(
-                PCollectionView<ViewT> view) {
-            return new CreateJStormPCollectionView<>(view);
-        }
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
+  /**
+   * Creates a primitive {@link PCollectionView}.
+   * <p>
+   * <p>For internal use only by runner implementors.
+   *
+   * @param <ElemT> The type of the elements of the input PCollection
+   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+   */
+  public static class CreateJStormPCollectionView<ElemT, ViewT>
+      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+    private PCollectionView<ViewT> view;
+
+    private CreateJStormPCollectionView(PCollectionView<ViewT> view) {
+      this.view = view;
+    }
 
-        @Override
-        public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
-            return view;
-        }
+    public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of(
+        PCollectionView<ViewT> view) {
+      return new CreateJStormPCollectionView<>(view);
+    }
+
+    @Override
+    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+      return view;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
index 0bf9a49..6de34dd 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
@@ -17,22 +17,22 @@
  */
 package org.apache.beam.runners.jstorm.translation.translator;
 
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
 import org.apache.beam.runners.jstorm.translation.runtime.WindowAssignExecutor;
 import org.apache.beam.sdk.transforms.windowing.Window;
 
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-
 public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
 
-    @Override
-    public void translateNode(Window.Assign<T> transform, TranslationContext context) {
-        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-        context.getUserGraphContext().setWindowed();
-        WindowAssignExecutor executor = new WindowAssignExecutor(
-                description,
-                transform.getWindowFn(),
-                userGraphContext.getOutputTag());
-        context.addTransformExecutor(executor);
-    }
+  @Override
+  public void translateNode(Window.Assign<T> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    String description =
+        describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+    context.getUserGraphContext().setWindowed();
+    WindowAssignExecutor executor = new WindowAssignExecutor(
+        description,
+        transform.getWindowFn(),
+        userGraphContext.getOutputTag());
+    context.addTransformExecutor(executor);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
index b67aff9..c863c9e 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
@@ -21,27 +21,27 @@ import org.apache.beam.runners.jstorm.translation.TranslationContext;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Translates a Window.Bound node into a Storm WindowedBolt
- * 
+ *
  * @param <T>
  */
 public class WindowBoundTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
-    private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class);
+  private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class);
 
-    // Do nothing here currently. The assign of window strategy is included in AssignTranslator.
-    @Override
-    public void translateNode(Window.Assign<T> transform, TranslationContext context) {
-        if (transform.getWindowFn() instanceof FixedWindows) {
-            context.getUserGraphContext().setWindowed();
-        } else if (transform.getWindowFn() instanceof SlidingWindows) {
-            context.getUserGraphContext().setWindowed();
-        } else {
-            throw new UnsupportedOperationException("Not supported window type currently: " + transform.getWindowFn());
-        }
+  // Do nothing here currently. The assign of window strategy is included in AssignTranslator.
+  @Override
+  public void translateNode(Window.Assign<T> transform, TranslationContext context) {
+    if (transform.getWindowFn() instanceof FixedWindows) {
+      context.getUserGraphContext().setWindowed();
+    } else if (transform.getWindowFn() instanceof SlidingWindows) {
+      context.getUserGraphContext().setWindowed();
+    } else {
+      throw new UnsupportedOperationException(
+          "Not supported window type currently: " + transform.getWindowFn());
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
index 07a3ad5..596d8b4 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.jstorm.translation.util;
 
 public class CommonInstance {
-    public static final String KEY = "Key";
-    public static final String VALUE = "Value";
+  public static final String KEY = "Key";
+  public static final String VALUE = "Value";
 
-    public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
+  public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
index 87562fd..750095e 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
@@ -17,30 +17,29 @@
  */
 package org.apache.beam.runners.jstorm.translation.util;
 
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import java.io.Serializable;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollectionView;
 
-import javax.annotation.Nullable;
-import java.io.Serializable;
-
 /**
  * No-op SideInputReader implementation.
  */
 public class DefaultSideInputReader implements SideInputReader, Serializable {
-    @Nullable
-    @Override
-    public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
-        return null;
-    }
+  @Nullable
+  @Override
+  public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
+    return null;
+  }
 
-    @Override
-    public <T> boolean contains(PCollectionView<T> pCollectionView) {
-        return false;
-    }
+  @Override
+  public <T> boolean contains(PCollectionView<T> pCollectionView) {
+    return false;
+  }
 
-    @Override
-    public boolean isEmpty() {
-        return true;
-    }
+  @Override
+  public boolean isEmpty() {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
index 481b7fb..4eb1d8f 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
@@ -17,73 +17,74 @@
  */
 package org.apache.beam.runners.jstorm.translation.util;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
 import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 
-import java.io.IOException;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Default StepContext for running DoFn This does not allow accessing state or timer internals.
  */
 public class DefaultStepContext implements ExecutionContext.StepContext {
 
-    private TimerInternals timerInternals;
+  private TimerInternals timerInternals;
 
-    private StateInternals stateInternals;
+  private StateInternals stateInternals;
 
-    public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) {
-        this.timerInternals = checkNotNull(timerInternals, "timerInternals");
-        this.stateInternals = checkNotNull(stateInternals, "stateInternals");
-    }
+  public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) {
+    this.timerInternals = checkNotNull(timerInternals, "timerInternals");
+    this.stateInternals = checkNotNull(stateInternals, "stateInternals");
+  }
 
-    @Override
-    public String getStepName() {
-        return null;
-    }
+  @Override
+  public String getStepName() {
+    return null;
+  }
 
-    @Override
-    public String getTransformName() {
-        return null;
-    }
+  @Override
+  public String getTransformName() {
+    return null;
+  }
 
-    @Override
-    public void noteOutput(WindowedValue<?> windowedValue) {
+  @Override
+  public void noteOutput(WindowedValue<?> windowedValue) {
 
-    }
+  }
 
-    @Override
-    public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
+  @Override
+  public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
 
-    }
+  }
 
-    @Override
-    public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
-            Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws IOException {
-        throw new UnsupportedOperationException("Writing side-input data is not supported.");
-    }
+  @Override
+  public <T, W extends BoundedWindow> void writePCollectionViewData(
+      TupleTag<?> tag, Iterable<WindowedValue<T>> data,
+      Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder)
+      throws IOException {
+    throw new UnsupportedOperationException("Writing side-input data is not supported.");
+  }
 
-    @Override
-    public StateInternals stateInternals() {
-        return stateInternals;
-    }
+  @Override
+  public StateInternals stateInternals() {
+    return stateInternals;
+  }
 
-    @Override
-    public TimerInternals timerInternals() {
-        return timerInternals;
-    }
+  @Override
+  public TimerInternals timerInternals() {
+    return timerInternals;
+  }
 
-    public void setStateInternals(StateInternals stateInternals) {
-        this.stateInternals = stateInternals;
-    }
+  public void setStateInternals(StateInternals stateInternals) {
+    this.stateInternals = stateInternals;
+  }
 
-    public void setTimerInternals(TimerInternals timerInternals) {
-        this.timerInternals = timerInternals;
-    }
+  public void setTimerInternals(TimerInternals timerInternals) {
+    this.timerInternals = timerInternals;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
index cbf815a..9fd62e4 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
@@ -17,37 +17,37 @@
  */
 package org.apache.beam.runners.jstorm.util;
 
+import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.jstorm.translation.runtime.Executor;
-
 import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
 import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor;
 import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
-import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
 public class RunnerUtils {
-    /**
-     * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>>
-     * @param elem
-     * @return
-     */
-    public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) {
-        WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem;
-        SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of(
-                kvElem.getValue().getKey(),
-                kvElem.withValue(kvElem.getValue().getValue()));
-        return workItem;
-    }
+  /**
+   * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>>
+   *
+   * @param elem
+   * @return
+   */
+  public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) {
+    WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem;
+    SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of(
+        kvElem.getValue().getKey(),
+        kvElem.withValue(kvElem.getValue().getValue()));
+    return workItem;
+  }
 
-    public static boolean isGroupByKeyExecutor (Executor executor) {
-        if (executor instanceof GroupByWindowExecutor) {
-            return true;
-        } else if (executor instanceof StatefulDoFnExecutor ||
-                executor instanceof MultiStatefulDoFnExecutor) {
-            return true;
-        } else {
-            return false;
-        }
+  public static boolean isGroupByKeyExecutor(Executor executor) {
+    if (executor instanceof GroupByWindowExecutor) {
+      return true;
+    } else if (executor instanceof StatefulDoFnExecutor ||
+        executor instanceof MultiStatefulDoFnExecutor) {
+      return true;
+    } else {
+      return false;
     }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
index 391699b..182794f 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
@@ -18,47 +18,48 @@
 
 package org.apache.beam.runners.jstorm.util;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.beam.sdk.options.PipelineOptions;
+import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
-
-import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
  * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
  */
 public class SerializedPipelineOptions implements Serializable {
 
-    private final byte[] serializedOptions;
+  private final byte[] serializedOptions;
 
-    /** Lazily initialized copy of deserialized options */
-    private transient PipelineOptions pipelineOptions;
+  /**
+   * Lazily initialized copy of deserialized options
+   */
+  private transient PipelineOptions pipelineOptions;
 
-    public SerializedPipelineOptions(PipelineOptions options) {
-        checkNotNull(options, "PipelineOptions must not be null.");
-
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            new ObjectMapper().writeValue(baos, options);
-            this.serializedOptions = baos.toByteArray();
-        } catch (Exception e) {
-            throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
-        }
+  public SerializedPipelineOptions(PipelineOptions options) {
+    checkNotNull(options, "PipelineOptions must not be null.");
 
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      new ObjectMapper().writeValue(baos, options);
+      this.serializedOptions = baos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
     }
 
-    public PipelineOptions getPipelineOptions() {
-        if (pipelineOptions == null) {
-            try {
-                pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
-            } catch (IOException e) {
-                throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
-            }
-        }
+  }
 
-        return pipelineOptions;
+  public PipelineOptions getPipelineOptions() {
+    if (pipelineOptions == null) {
+      try {
+        pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+      } catch (IOException e) {
+        throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+      }
     }
 
+    return pipelineOptions;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
index dee5f1a..cce21b3 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 
 /**
  * Singleton keyed word item.
+ *
  * @param <K>
  * @param <ElemT>
  */
@@ -38,7 +39,7 @@ public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT>
   }
 
   public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) {
-      return new SingletonKeyedWorkItem<K, ElemT>(key, value);
+    return new SingletonKeyedWorkItem<K, ElemT>(key, value);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java
index 344d3c7..0d6fc23 100644
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java
+++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java
@@ -34,7 +34,7 @@ public class JStormRunnerRegistrarTest {
   @Test
   public void testFullName() {
     String[] args =
-        new String[] {String.format("--runner=%s", JStormRunner.class.getName())};
+        new String[]{String.format("--runner=%s", JStormRunner.class.getName())};
     PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
     assertEquals(opts.getRunner(), JStormRunner.class);
   }
@@ -42,7 +42,7 @@ public class JStormRunnerRegistrarTest {
   @Test
   public void testClassName() {
     String[] args =
-        new String[] {String.format("--runner=%s", JStormRunner.class.getSimpleName())};
+        new String[]{String.format("--runner=%s", JStormRunner.class.getSimpleName())};
     PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
     assertEquals(opts.getRunner(), JStormRunner.class);
   }


Mime
View raw message