beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [28/50] [abbrv] incubator-beam git commit: [flink] adjust directories according to package name
Date Fri, 04 Mar 2016 18:11:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
deleted file mode 100644
index d0423b9..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.cloud.dataflow.sdk.io.Sink;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.common.base.Preconditions;
-import com.google.cloud.dataflow.sdk.transforms.Write;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.AbstractID;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Field;
-
-/**
- * Wrapper class to use generic Write.Bound transforms as sinks.
- * @param <T> The type of the incoming records.
- */
-public class SinkOutputFormat<T> implements OutputFormat<T> {
-
-  private final Sink<T> sink;
-
-  private transient PipelineOptions pipelineOptions;
-
-  private Sink.WriteOperation<T, ?> writeOperation;
-  private Sink.Writer<T, ?> writer;
-
-  private AbstractID uid = new AbstractID();
-
-  public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) {
-    this.sink = extractSink(transform);
-    this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions);
-  }
-
-  private Sink<T> extractSink(Write.Bound<T> transform) {
-    // TODO possibly add a getter in the upstream
-    try {
-      Field sinkField = transform.getClass().getDeclaredField("sink");
-      sinkField.setAccessible(true);
-      @SuppressWarnings("unchecked")
-      Sink<T> extractedSink = (Sink<T>) sinkField.get(transform);
-      return extractedSink;
-    } catch (NoSuchFieldException | IllegalAccessException e) {
-      throw new RuntimeException("Could not acquire custom sink field.", e);
-    }
-  }
-
-  @Override
-  public void configure(Configuration configuration) {
-    writeOperation = sink.createWriteOperation(pipelineOptions);
-    try {
-      writeOperation.initialize(pipelineOptions);
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to initialize the write operation.", e);
-    }
-  }
-
-  @Override
-  public void open(int taskNumber, int numTasks) throws IOException {
-    try {
-      writer = writeOperation.createWriter(pipelineOptions);
-    } catch (Exception e) {
-      throw new IOException("Couldn't create writer.", e);
-    }
-    try {
-      writer.open(uid + "-" + String.valueOf(taskNumber));
-    } catch (Exception e) {
-      throw new IOException("Couldn't open writer.", e);
-    }
-  }
-
-  @Override
-  public void writeRecord(T record) throws IOException {
-    try {
-      writer.write(record);
-    } catch (Exception e) {
-      throw new IOException("Couldn't write record.", e);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      writer.close();
-    } catch (Exception e) {
-      throw new IOException("Couldn't close writer.", e);
-    }
-  }
-
-  private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
-    out.defaultWriteObject();
-    ObjectMapper mapper = new ObjectMapper();
-    mapper.writeValue(out, pipelineOptions);
-  }
-
-  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-    in.defaultReadObject();
-    ObjectMapper mapper = new ObjectMapper();
-    pipelineOptions = mapper.readValue(in, PipelineOptions.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
deleted file mode 100644
index 2d62416..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.Source;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A Flink {@link org.apache.flink.api.common.io.InputFormat} that wraps a
- * Dataflow {@link com.google.cloud.dataflow.sdk.io.Source}.
- */
-public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> {
-  private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);
-
-  private final BoundedSource<T> initialSource;
-  private transient PipelineOptions options;
-
-  private BoundedSource.BoundedReader<T> reader = null;
-  private boolean reachedEnd = true;
-
-  public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
-    this.initialSource = initialSource;
-    this.options = options;
-  }
-
-  private void writeObject(ObjectOutputStream out)
-      throws IOException, ClassNotFoundException {
-    out.defaultWriteObject();
-    ObjectMapper mapper = new ObjectMapper();
-    mapper.writeValue(out, options);
-  }
-
-  private void readObject(ObjectInputStream in)
-      throws IOException, ClassNotFoundException {
-    in.defaultReadObject();
-    ObjectMapper mapper = new ObjectMapper();
-    options = mapper.readValue(in, PipelineOptions.class);
-  }
-
-  @Override
-  public void configure(Configuration configuration) {}
-
-  @Override
-  public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
-    reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
-    reachedEnd = false;
-  }
-
-  @Override
-  public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
-    try {
-      final long estimatedSize = initialSource.getEstimatedSizeBytes(options);
-
-      return new BaseStatistics() {
-        @Override
-        public long getTotalInputSize() {
-          return estimatedSize;
-
-        }
-
-        @Override
-        public long getNumberOfRecords() {
-          return BaseStatistics.NUM_RECORDS_UNKNOWN;
-        }
-
-        @Override
-        public float getAverageRecordWidth() {
-          return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN;
-        }
-      };
-    } catch (Exception e) {
-      LOG.warn("Could not read Source statistics: {}", e);
-    }
-
-    return null;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
-    long desiredSizeBytes;
-    try {
-      desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
-      List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes,
-          options);
-      List<SourceInputSplit<T>> splits = new ArrayList<>();
-      int splitCount = 0;
-      for (Source<T> shard: shards) {
-        splits.add(new SourceInputSplit<>(shard, splitCount++));
-      }
-      return splits.toArray(new SourceInputSplit[splits.size()]);
-    } catch (Exception e) {
-      throw new IOException("Could not create input splits from Source.", e);
-    }
-  }
-
-  @Override
-  public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) {
-    return new InputSplitAssigner() {
-      private int index = 0;
-      private final SourceInputSplit[] splits = sourceInputSplits;
-      @Override
-      public InputSplit getNextInputSplit(String host, int taskId) {
-        if (index < splits.length) {
-          return splits[index++];
-        } else {
-          return null;
-        }
-      }
-    };
-  }
-
-
-  @Override
-  public boolean reachedEnd() throws IOException {
-    return reachedEnd;
-  }
-
-  @Override
-  public T nextRecord(T t) throws IOException {
-
-    reachedEnd = !reader.advance();
-    if (!reachedEnd) {
-      return reader.getCurrent();
-    }
-    return null;
-  }
-
-  @Override
-  public void close() throws IOException {
-    reader.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java
deleted file mode 100644
index 1b45ad7..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import com.google.cloud.dataflow.sdk.io.Source;
-import org.apache.flink.core.io.InputSplit;
-
-/**
- * {@link org.apache.flink.core.io.InputSplit} for
- * {@link org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}. We pass
- * the sharded Source around in the input split because Sources simply split up into several
- * Sources for sharding. This is different to how Flink creates a separate InputSplit from
- * an InputFormat.
- */
-public class SourceInputSplit<T> implements InputSplit {
-
-  private Source<T> source;
-  private int splitNumber;
-
-  public SourceInputSplit() {
-  }
-
-  public SourceInputSplit(Source<T> source, int splitNumber) {
-    this.source = source;
-    this.splitNumber = splitNumber;
-  }
-
-  @Override
-  public int getSplitNumber() {
-    return splitNumber;
-  }
-
-  public Source<T> getSource() {
-    return source;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
deleted file mode 100644
index e2ceae6..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.common.base.Preconditions;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.*;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.base.Throwables;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-import org.joda.time.format.PeriodFormat;
-
-import java.util.Collection;
-
-/**
- * An abstract class that encapsulates the common code of the the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound}
- * and {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} wrappers. See the {@link FlinkParDoBoundWrapper} and
- * {@link FlinkParDoBoundMultiWrapper} for the actual wrappers of the aforementioned transformations.
- * */
-public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> {
-
-  private final DoFn<IN, OUTDF> doFn;
-  private final WindowingStrategy<?, ?> windowingStrategy;
-  private transient PipelineOptions options;
-
-  private DoFnProcessContext context;
-
-  public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) {
-    Preconditions.checkNotNull(options);
-    Preconditions.checkNotNull(windowingStrategy);
-    Preconditions.checkNotNull(doFn);
-
-    this.doFn = doFn;
-    this.options = options;
-    this.windowingStrategy = windowingStrategy;
-  }
-
-  private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) {
-    if (this.context == null) {
-      this.context = new DoFnProcessContext(function, outCollector);
-    }
-  }
-
-  @Override
-  public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>> out) throws Exception {
-    this.initContext(doFn, out);
-
-    // for each window the element belongs to, create a new copy here.
-    Collection<? extends BoundedWindow> windows = value.getWindows();
-    if (windows.size() <= 1) {
-      processElement(value);
-    } else {
-      for (BoundedWindow window : windows) {
-        processElement(WindowedValue.of(
-            value.getValue(), value.getTimestamp(), window, value.getPane()));
-      }
-    }
-  }
-
-  private void processElement(WindowedValue<IN> value) throws Exception {
-    this.context.setElement(value);
-    this.doFn.startBundle(context);
-    doFn.processElement(context);
-    this.doFn.finishBundle(context);
-  }
-
-  private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext {
-
-    private final DoFn<IN, OUTDF> fn;
-
-    protected final Collector<WindowedValue<OUTFL>> collector;
-
-    private WindowedValue<IN> element;
-
-    private DoFnProcessContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) {
-      function.super();
-      super.setupDelegateAggregators();
-
-      this.fn = function;
-      this.collector = outCollector;
-    }
-
-    public void setElement(WindowedValue<IN> value) {
-      this.element = value;
-    }
-
-    @Override
-    public IN element() {
-      return this.element.getValue();
-    }
-
-    @Override
-    public Instant timestamp() {
-      return this.element.getTimestamp();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      if (!(fn instanceof DoFn.RequiresWindowAccess)) {
-        throw new UnsupportedOperationException(
-            "window() is only available in the context of a DoFn marked as RequiresWindow.");
-      }
-
-      Collection<? extends BoundedWindow> windows = this.element.getWindows();
-      if (windows.size() != 1) {
-        throw new IllegalArgumentException("Each element is expected to belong to 1 window. " +
-            "This belongs to " + windows.size() + ".");
-      }
-      return windows.iterator().next();
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return this.element.getPane();
-    }
-
-    @Override
-    public WindowingInternals<IN, OUTDF> windowingInternals() {
-      return windowingInternalsHelper(element, collector);
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      throw new RuntimeException("sideInput() is not supported in Streaming mode.");
-    }
-
-    @Override
-    public void output(OUTDF output) {
-      outputWithTimestamp(output, this.element.getTimestamp());
-    }
-
-    @Override
-    public void outputWithTimestamp(OUTDF output, Instant timestamp) {
-      outputWithTimestampHelper(element, output, timestamp, collector);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      sideOutputWithTimestamp(tag, output, this.element.getTimestamp());
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      sideOutputWithTimestampHelper(element, output, timestamp, collector, tag);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      Accumulator acc = getRuntimeContext().getAccumulator(name);
-      if (acc != null) {
-        AccumulatorHelper.compareAccumulatorTypes(name,
-            SerializableFnAggregatorWrapper.class, acc.getClass());
-        return (Aggregator<AggInputT, AggOutputT>) acc;
-      }
-
-      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
-          new SerializableFnAggregatorWrapper<>(combiner);
-      getRuntimeContext().addAccumulator(name, accumulator);
-      return accumulator;
-    }
-  }
-
-  protected void checkTimestamp(WindowedValue<IN> ref, Instant timestamp) {
-    if (timestamp.isBefore(ref.getTimestamp().minus(doFn.getAllowedTimestampSkew()))) {
-      throw new IllegalArgumentException(String.format(
-          "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
-              + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
-              + "DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.",
-          timestamp, ref.getTimestamp(),
-          PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod())));
-    }
-  }
-
-  protected <T> WindowedValue<T> makeWindowedValue(
-      T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-    final Instant inputTimestamp = timestamp;
-    final WindowFn windowFn = windowingStrategy.getWindowFn();
-
-    if (timestamp == null) {
-      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-    }
-
-    if (windows == null) {
-      try {
-        windows = windowFn.assignWindows(windowFn.new AssignContext() {
-          @Override
-          public Object element() {
-            throw new UnsupportedOperationException(
-                "WindowFn attempted to access input element when none was available");
-          }
-
-          @Override
-          public Instant timestamp() {
-            if (inputTimestamp == null) {
-              throw new UnsupportedOperationException(
-                  "WindowFn attempted to access input timestamp when none was available");
-            }
-            return inputTimestamp;
-          }
-
-          @Override
-          public Collection<? extends BoundedWindow> windows() {
-            throw new UnsupportedOperationException(
-                "WindowFn attempted to access input windows when none were available");
-          }
-        });
-      } catch (Exception e) {
-        throw UserCodeException.wrap(e);
-      }
-    }
-
-    return WindowedValue.of(output, timestamp, windows, pane);
-  }
-
-  ///////////      ABSTRACT METHODS TO BE IMPLEMENTED BY SUBCLASSES      /////////////////
-
-  public abstract void outputWithTimestampHelper(
-      WindowedValue<IN> inElement,
-      OUTDF output,
-      Instant timestamp,
-      Collector<WindowedValue<OUTFL>> outCollector);
-
-  public abstract <T> void sideOutputWithTimestampHelper(
-      WindowedValue<IN> inElement,
-      T output,
-      Instant timestamp,
-      Collector<WindowedValue<OUTFL>> outCollector,
-      TupleTag<T> tag);
-
-  public abstract WindowingInternals<IN, OUTDF> windowingInternalsHelper(
-      WindowedValue<IN> inElement,
-      Collector<WindowedValue<OUTFL>> outCollector);
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
deleted file mode 100644
index 906a399..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ /dev/null
@@ -1,629 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.*;
-import com.google.cloud.dataflow.sdk.coders.*;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.*;
-import com.google.cloud.dataflow.sdk.values.*;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.operators.*;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * This class is the key class implementing all the windowing/triggering logic of Apache Beam.
- * To provide full compatibility and support for all the windowing/triggering combinations offered by
- * Beam, we opted for a strategy that uses the SDK's code for doing these operations. See the code in
- * ({@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}.
- * <p/>
- * In a nutshell, when the execution arrives to this operator, we expect to have a stream <b>already
- * grouped by key</b>. Each of the elements that enter here, registers a timer
- * (see {@link TimerInternals#setTimer(TimerInternals.TimerData)} in the
- * {@link FlinkGroupAlsoByWindowWrapper#activeTimers}.
- * This is essentially a timestamp indicating when to trigger the computation over the window this
- * element belongs to.
- * <p/>
- * When a watermark arrives, all the registered timers are checked to see which ones are ready to
- * fire (see {@link FlinkGroupAlsoByWindowWrapper#processWatermark(Watermark)}). These are deregistered from
- * the {@link FlinkGroupAlsoByWindowWrapper#activeTimers}
- * list, and are fed into the {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}
- * for furhter processing.
- */
-public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
-    extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>>
-    implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, WindowedValue<KV<K, VOUT>>> {
-
-  private static final long serialVersionUID = 1L;
-
-  private transient PipelineOptions options;
-
-  private transient CoderRegistry coderRegistry;
-
-  private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator;
-
-  private ProcessContext context;
-
-  private final WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy;
-
-  private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn;
-
-  private final KvCoder<K, VIN> inputKvCoder;
-
-  /**
-   * State is kept <b>per-key</b>. This data structure keeps this mapping between an active key, i.e. a
-   * key whose elements are currently waiting to be processed, and its associated state.
-   */
-  private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>();
-
-  /**
-   * Timers waiting to be processed.
-   */
-  private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
-
-  private FlinkTimerInternals timerInternals = new FlinkTimerInternals();
-
-  /**
-   * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
-   * This method assumes that <b>elements are already grouped by key</b>.
-   * <p/>
-   * The difference with {@link #createForIterable(PipelineOptions, PCollection, KeyedStream)}
-   * is that this method assumes that a combiner function is provided
-   * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
-   * A combiner helps at increasing the speed and, in most of the cases, reduce the per-window state.
-   *
-   * @param options            the general job configuration options.
-   * @param input              the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}.
-   * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
-   * @param combiner           the combiner to be used.
-   * @param outputKvCoder      the type of the output values.
-   */
-  public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, VOUT>>> create(
-      PipelineOptions options,
-      PCollection input,
-      KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey,
-      Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner,
-      KvCoder<K, VOUT> outputKvCoder) {
-    Preconditions.checkNotNull(options);
-
-    KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
-    FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper<>(options,
-        input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, combiner);
-
-    Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
-        outputKvCoder,
-        input.getWindowingStrategy().getWindowFn().windowCoder());
-
-    CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo =
-        new CoderTypeInformation<>(windowedOutputElemCoder);
-
-    DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = groupedStreamByKey
-        .transform("GroupByWindowWithCombiner",
-            new CoderTypeInformation<>(outputKvCoder),
-            windower)
-        .returns(outputTypeInfo);
-
-    return groupedByKeyAndWindow;
-  }
-
-  /**
-   * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
-   * This method assumes that <b>elements are already grouped by key</b>.
-   * <p/>
-   * The difference with {@link #create(PipelineOptions, PCollection, KeyedStream, Combine.KeyedCombineFn, KvCoder)}
-   * is that this method assumes no combiner function
-   * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
-   *
-   * @param options            the general job configuration options.
-   * @param input              the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}.
-   * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
-   */
-  public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> createForIterable(
-      PipelineOptions options,
-      PCollection input,
-      KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) {
-    Preconditions.checkNotNull(options);
-
-    KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
-    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-    Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
-
-    FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper(options,
-        input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, null);
-
-    Coder<Iterable<VIN>> valueIterCoder = IterableCoder.of(inputValueCoder);
-    KvCoder<K, Iterable<VIN>> outputElemCoder = KvCoder.of(keyCoder, valueIterCoder);
-
-    Coder<WindowedValue<KV<K, Iterable<VIN>>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
-        outputElemCoder,
-        input.getWindowingStrategy().getWindowFn().windowCoder());
-
-    CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> outputTypeInfo =
-        new CoderTypeInformation<>(windowedOutputElemCoder);
-
-    DataStream<WindowedValue<KV<K, Iterable<VIN>>>> groupedByKeyAndWindow = groupedStreamByKey
-        .transform("GroupByWindow",
-            new CoderTypeInformation<>(windowedOutputElemCoder),
-            windower)
-        .returns(outputTypeInfo);
-
-    return groupedByKeyAndWindow;
-  }
-
-  public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper
-  createForTesting(PipelineOptions options,
-                   CoderRegistry registry,
-                   WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
-                   KvCoder<K, VIN> inputCoder,
-                   Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
-    Preconditions.checkNotNull(options);
-
-    return new FlinkGroupAlsoByWindowWrapper(options, registry, windowingStrategy, inputCoder, combiner);
-  }
-
-  private FlinkGroupAlsoByWindowWrapper(PipelineOptions options,
-                                        CoderRegistry registry,
-                                        WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
-                                        KvCoder<K, VIN> inputCoder,
-                                        Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
-    Preconditions.checkNotNull(options);
-
-    this.options = Preconditions.checkNotNull(options);
-    this.coderRegistry = Preconditions.checkNotNull(registry);
-    this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder();
-    this.windowingStrategy = Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy();
-    this.combineFn = combiner;
-    this.operator = createGroupAlsoByWindowOperator();
-    this.chainingStrategy = ChainingStrategy.ALWAYS;
-  }
-
-  @Override
-  public void open() throws Exception {
-    super.open();
-    this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals);
-  }
-
-  /**
-   * Create the adequate {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn},
-   * <b> if not already created</b>.
-   * If a {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, then
-   * a function with that combiner is created, so that elements are combined as they arrive. This is
-   * done for speed and (in most of the cases) for reduction of the per-window state.
-   */
-  private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() {
-    if (this.operator == null) {
-      if (this.combineFn == null) {
-        // Thus VOUT == Iterable<VIN>
-        Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
-
-        this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
-            (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
-      } else {
-        Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
-
-        AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn
-            .withInputCoder(combineFn, coderRegistry, inputKvCoder);
-
-        this.operator = GroupAlsoByWindowViaWindowSetDoFn.create(
-            (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn));
-      }
-    }
-    return this.operator;
-  }
-
-  private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception {
-    context.setElement(workItem, getStateInternalsForKey(workItem.key()));
-
-    // TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded.
-    operator.startBundle(context);
-    operator.processElement(context);
-    operator.finishBundle(context);
-  }
-
-  @Override
-  public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception {
-    ArrayList<WindowedValue<VIN>> elements = new ArrayList<>();
-    elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(),
-        element.getValue().getWindows(), element.getValue().getPane()));
-    processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements));
-  }
-
-  @Override
-  public void processWatermark(Watermark mark) throws Exception {
-    context.setCurrentInputWatermark(new Instant(mark.getTimestamp()));
-
-    Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp());
-    if (!timers.isEmpty()) {
-      for (K key : timers.keySet()) {
-        processKeyedWorkItem(KeyedWorkItems.<K, VIN>timersWorkItem(key, timers.get(key)));
-      }
-    }
-
-    /**
-     * This is to take into account the different semantics of the Watermark in Flink and
-     * in Dataflow. To understand the reasoning behind the Dataflow semantics and its
-     * watermark holding logic, see the documentation of
-     * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, boolean)}
-     * */
-    long millis = Long.MAX_VALUE;
-    for (FlinkStateInternals state : perKeyStateInternals.values()) {
-      Instant watermarkHold = state.getWatermarkHold();
-      if (watermarkHold != null && watermarkHold.getMillis() < millis) {
-        millis = watermarkHold.getMillis();
-      }
-    }
-
-    if (mark.getTimestamp() < millis) {
-      millis = mark.getTimestamp();
-    }
-
-    context.setCurrentOutputWatermark(new Instant(millis));
-
-    // Don't forget to re-emit the watermark for further operators down the line.
-    // This is critical for jobs with multiple aggregation steps.
-    // Imagine a job with a groupByKey() on key K1, followed by a map() that changes
-    // the key K1 to K2, and another groupByKey() on K2. In this case, if the watermark
-    // is not re-emitted, the second aggregation would never be triggered, and no result
-    // will be produced.
-    output.emitWatermark(new Watermark(millis));
-  }
-
-  @Override
-  public void close() throws Exception {
-    super.close();
-  }
-
-  private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
-    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
-    if (timersForKey == null) {
-      timersForKey = new HashSet<>();
-    }
-    timersForKey.add(timer);
-    activeTimers.put(key, timersForKey);
-  }
-
-  private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
-    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
-    if (timersForKey != null) {
-      timersForKey.remove(timer);
-      if (timersForKey.isEmpty()) {
-        activeTimers.remove(key);
-      } else {
-        activeTimers.put(key, timersForKey);
-      }
-    }
-  }
-
-  /**
-   * Returns the list of timers that are ready to fire. These are the timers
-   * that are registered to be triggered at a time before the current watermark.
-   * We keep these timers in a Set, so that they are deduplicated, as the same
-   * timer can be registered multiple times.
-   */
-  private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) {
-
-    // we keep the timers to return in a different list and launch them later
-    // because we cannot prevent a trigger from registering another trigger,
-    // which would lead to concurrent modification exception.
-    Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create();
-
-    Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator();
-    while (it.hasNext()) {
-      Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
-
-      Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
-      while (timerIt.hasNext()) {
-        TimerInternals.TimerData timerData = timerIt.next();
-        if (timerData.getTimestamp().isBefore(currentWatermark)) {
-          toFire.put(keyWithTimers.getKey(), timerData);
-          timerIt.remove();
-        }
-      }
-
-      if (keyWithTimers.getValue().isEmpty()) {
-        it.remove();
-      }
-    }
-    return toFire;
-  }
-
-  /**
-   * Gets the state associated with the specified key.
-   *
-   * @param key the key whose state we want.
-   * @return The {@link FlinkStateInternals}
-   * associated with that key.
-   */
-  private FlinkStateInternals<K> getStateInternalsForKey(K key) {
-    FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key);
-    if (stateInternals == null) {
-      Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
-      OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn();
-      stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn);
-      perKeyStateInternals.put(key, stateInternals);
-    }
-    return stateInternals;
-  }
-
-  private class FlinkTimerInternals extends AbstractFlinkTimerInternals<K, VIN> {
-    @Override
-    public void setTimer(TimerData timerKey) {
-      registerActiveTimer(context.element().key(), timerKey);
-    }
-
-    @Override
-    public void deleteTimer(TimerData timerKey) {
-      unregisterActiveTimer(context.element().key(), timerKey);
-    }
-  }
-
-  private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, VIN, VOUT, ?, KeyedWorkItem<K, VIN>>.ProcessContext {
-
-    private final FlinkTimerInternals timerInternals;
-
-    private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector;
-
-    private FlinkStateInternals<K> stateInternals;
-
-    private KeyedWorkItem<K, VIN> element;
-
-    public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
-                          TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector,
-                          FlinkTimerInternals timerInternals) {
-      function.super();
-      super.setupDelegateAggregators();
-
-      this.collector = Preconditions.checkNotNull(outCollector);
-      this.timerInternals = Preconditions.checkNotNull(timerInternals);
-    }
-
-    public void setElement(KeyedWorkItem<K, VIN> element,
-                           FlinkStateInternals<K> stateForKey) {
-      this.element = element;
-      this.stateInternals = stateForKey;
-    }
-
-    public void setCurrentInputWatermark(Instant watermark) {
-      this.timerInternals.setCurrentInputWatermark(watermark);
-    }
-
-    public void setCurrentOutputWatermark(Instant watermark) {
-      this.timerInternals.setCurrentOutputWatermark(watermark);
-    }
-
-    @Override
-    public KeyedWorkItem<K, VIN> element() {
-      return this.element;
-    }
-
-    @Override
-    public Instant timestamp() {
-      throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      // TODO: PipelineOptions need to be available on the workers.
-      // Ideally they are captured as part of the pipeline.
-      // For now, construct empty options so that StateContexts.createFromComponents
-      // will yield a valid StateContext, which is needed to support the StateContext.window().
-      if (options == null) {
-        options = new PipelineOptions() {
-          @Override
-          public <T extends PipelineOptions> T as(Class<T> kls) {
-            return null;
-          }
-
-          @Override
-          public <T extends PipelineOptions> T cloneAs(Class<T> kls) {
-            return null;
-          }
-
-          @Override
-          public Class<? extends PipelineRunner<?>> getRunner() {
-            return null;
-          }
-
-          @Override
-          public void setRunner(Class<? extends PipelineRunner<?>> kls) {
-
-          }
-
-          @Override
-          public CheckEnabled getStableUniqueNames() {
-            return null;
-          }
-
-          @Override
-          public void setStableUniqueNames(CheckEnabled enabled) {
-          }
-        };
-      }
-      return options;
-    }
-
-    @Override
-    public void output(KV<K, VOUT> output) {
-      throw new UnsupportedOperationException(
-          "output() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public void outputWithTimestamp(KV<K, VOUT> output, Instant timestamp) {
-      throw new UnsupportedOperationException(
-          "outputWithTimestamp() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public PaneInfo pane() {
-      throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public BoundedWindow window() {
-      throw new UnsupportedOperationException(
-          "window() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> windowingInternals() {
-      return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() {
-
-        @Override
-        public com.google.cloud.dataflow.sdk.util.state.StateInternals stateInternals() {
-          return stateInternals;
-        }
-
-        @Override
-        public void outputWindowedValue(KV<K, VOUT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-          // TODO: No need to represent timestamp twice.
-          collector.setAbsoluteTimestamp(timestamp.getMillis());
-          collector.collect(WindowedValue.of(output, timestamp, windows, pane));
-
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return timerInternals;
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
-        }
-
-        @Override
-        public PaneInfo pane() {
-          throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
-        }
-
-        @Override
-        public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
-          throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-          throw new RuntimeException("sideInput() is not available in Streaming mode.");
-        }
-      };
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      throw new RuntimeException("sideInput() is not supported in Streaming mode.");
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      // ignore the side output, this can happen when a user does not register
-      // side outputs but then outputs using a freshly created TupleTag.
-      throw new RuntimeException("sideOutput() is not available when grouping by window.");
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      sideOutput(tag, output);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      Accumulator acc = getRuntimeContext().getAccumulator(name);
-      if (acc != null) {
-        AccumulatorHelper.compareAccumulatorTypes(name,
-            SerializableFnAggregatorWrapper.class, acc.getClass());
-        return (Aggregator<AggInputT, AggOutputT>) acc;
-      }
-
-      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
-          new SerializableFnAggregatorWrapper<>(combiner);
-      getRuntimeContext().addAccumulator(name, accumulator);
-      return accumulator;
-    }
-  }
-
-  //////////////        Checkpointing implementation        ////////////////
-
-  @Override
-  public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
-    StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-    AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-    StateCheckpointWriter writer = StateCheckpointWriter.create(out);
-    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-
-    // checkpoint the timers
-    StateCheckpointUtils.encodeTimers(activeTimers, writer, keyCoder);
-
-    // checkpoint the state
-    StateCheckpointUtils.encodeState(perKeyStateInternals, writer, keyCoder);
-
-    // checkpoint the timerInternals
-    context.timerInternals.encodeTimerInternals(context, writer,
-        inputKvCoder, windowingStrategy.getWindowFn().windowCoder());
-
-    taskState.setOperatorState(out.closeAndGetHandle());
-    return taskState;
-  }
-
-  @Override
-  public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
-    super.restoreState(taskState, recoveryTimestamp);
-
-    final ClassLoader userClassloader = getUserCodeClassloader();
-
-    Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
-    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-
-    @SuppressWarnings("unchecked")
-    StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
-    DataInputView in = inputState.getState(userClassloader);
-    StateCheckpointReader reader = new StateCheckpointReader(in);
-
-    // restore the timers
-    this.activeTimers = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
-
-    // restore the state
-    this.perKeyStateInternals = StateCheckpointUtils.decodeState(
-        reader, windowingStrategy.getOutputTimeFn(), keyCoder, windowCoder, userClassloader);
-
-    // restore the timerInternals.
-    this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
deleted file mode 100644
index 61953a6..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.KV;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-
-/**
- * This class groups the elements by key. It assumes that already the incoming stream
- * is composed of <code>[Key,Value]</code> pairs.
- * */
-public class FlinkGroupByKeyWrapper {
-
-  /**
-   * Just an auxiliary interface to bypass the fact that java anonymous classes cannot implement
-   * multiple interfaces.
-   */
-  private interface KeySelectorWithQueryableResultType<K, V> extends KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> {
-  }
-
-  public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) {
-    final Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-    final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder);
-    final boolean isKeyVoid = keyCoder instanceof VoidCoder;
-
-    return inputDataStream.keyBy(
-        new KeySelectorWithQueryableResultType<K, V>() {
-
-          @Override
-          public K getKey(WindowedValue<KV<K, V>> value) throws Exception {
-            return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE :
-                value.getValue().getKey();
-          }
-
-          @Override
-          public TypeInformation<K> getProducedType() {
-            return keyTypeInfo;
-          }
-        });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
deleted file mode 100644
index cdf23f6..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-import java.util.Map;
-
-/**
- * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} Beam transformation.
- * */
-public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> {
-
-  private final TupleTag<?> mainTag;
-  private final Map<TupleTag<?>, Integer> outputLabels;
-
-  public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
-    super(options, windowingStrategy, doFn);
-    this.mainTag = Preconditions.checkNotNull(mainTag);
-    this.outputLabels = Preconditions.checkNotNull(tagsToLabels);
-  }
-
-  @Override
-  public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) {
-    checkTimestamp(inElement, timestamp);
-    Integer index = outputLabels.get(mainTag);
-    collector.collect(makeWindowedValue(
-        new RawUnionValue(index, output),
-        timestamp,
-        inElement.getWindows(),
-        inElement.getPane()));
-  }
-
-  @Override
-  public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector, TupleTag<T> tag) {
-    checkTimestamp(inElement, timestamp);
-    Integer index = outputLabels.get(tag);
-    if (index != null) {
-      collector.collect(makeWindowedValue(
-          new RawUnionValue(index, output),
-          timestamp,
-          inElement.getWindows(),
-          inElement.getPane()));
-    }
-  }
-
-  @Override
-  public WindowingInternals<IN, OUT> windowingInternalsHelper(WindowedValue<IN> inElement, Collector<WindowedValue<RawUnionValue>> outCollector) {
-    throw new RuntimeException("FlinkParDoBoundMultiWrapper is just an internal operator serving as " +
-        "an intermediate transformation for the ParDo.BoundMulti translation. windowingInternals() " +
-        "is not available in this class.");
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
deleted file mode 100644
index 3357cd5..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.*;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound} Beam transformation.
- * */
-public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> {
-
-  public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) {
-    super(options, windowingStrategy, doFn);
-  }
-
-  @Override
-  public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<OUT>> collector) {
-    checkTimestamp(inElement, timestamp);
-    collector.collect(makeWindowedValue(
-        output,
-        timestamp,
-        inElement.getWindows(),
-        inElement.getPane()));
-  }
-
-  @Override
-  public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<OUT>> outCollector, TupleTag<T> tag) {
-    // ignore the side output, this can happen when a user does not register
-    // side outputs but then outputs using a freshly created TupleTag.
-    throw new RuntimeException("sideOutput() not not available in ParDo.Bound().");
-  }
-
-  @Override
-  public WindowingInternals<IN, OUT> windowingInternalsHelper(final WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) {
-    return new WindowingInternals<IN, OUT>() {
-      @Override
-      public StateInternals stateInternals() {
-        throw new NullPointerException("StateInternals are not available for ParDo.Bound().");
-      }
-
-      @Override
-      public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-        collector.collect(makeWindowedValue(output, timestamp, windows, pane));
-      }
-
-      @Override
-      public TimerInternals timerInternals() {
-        throw new NullPointerException("TimeInternals are not available for ParDo.Bound().");
-      }
-
-      @Override
-      public Collection<? extends BoundedWindow> windows() {
-        return inElement.getWindows();
-      }
-
-      @Override
-      public PaneInfo pane() {
-        return inElement.getPane();
-      }
-
-      @Override
-      public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
-        throw new RuntimeException("writePCollectionViewData() not supported in Streaming mode.");
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-        throw new RuntimeException("sideInput() not implemented.");
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
deleted file mode 100644
index 2599e88..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-import java.io.ByteArrayInputStream;
-import java.util.List;
-
-/**
- * This flat map function bootstraps from collection elements and turns them into WindowedValues
- * (as required by the Flink runner).
- */
-public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN, WindowedValue<OUT>> {
-
-  private final List<byte[]> elements;
-  private final Coder<OUT> coder;
-
-  public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> coder) {
-    this.elements = elements;
-    this.coder = coder;
-  }
-
-  @Override
-  public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception {
-
-    @SuppressWarnings("unchecked")
-    OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE;
-    for (byte[] element : elements) {
-      ByteArrayInputStream bai = new ByteArrayInputStream(element);
-      OUT outValue = coder.decode(bai, Coder.Context.OUTER);
-
-      if (outValue == null) {
-        out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
-      } else {
-        out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
-      }
-    }
-
-    out.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
deleted file mode 100644
index ddbc993..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.common.base.Preconditions;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-
-import javax.annotation.Nullable;
-import java.util.List;
-
-/**
- * A wrapper translating Flink Sources implementing the {@link RichParallelSourceFunction} interface, into
- * unbounded Beam sources (see {@link UnboundedSource}).
- * */
-public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> extends UnboundedSource<T, C> {
-
-  private final PipelineOptions options;
-  private final RichParallelSourceFunction<T> flinkSource;
-
-  public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) {
-    if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) {
-      throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-    }
-    options = Preconditions.checkNotNull(pipelineOptions);
-    flinkSource = Preconditions.checkNotNull(source);
-    validate();
-  }
-
-  public RichParallelSourceFunction<T> getFlinkSource() {
-    return this.flinkSource;
-  }
-
-  @Override
-  public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
-    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-  }
-
-  @Override
-  public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) {
-    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-  }
-
-  @Nullable
-  @Override
-  public Coder<C> getCheckpointMarkCoder() {
-    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-  }
-
-
-  @Override
-  public void validate() {
-    Preconditions.checkNotNull(options);
-    Preconditions.checkNotNull(flinkSource);
-    if(!options.getRunner().equals(FlinkPipelineRunner.class)) {
-      throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-    }
-  }
-
-  @Override
-  public Coder<T> getDefaultOutputCoder() {
-    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
deleted file mode 100644
index a24964a..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Collections;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * An example unbounded Beam source that reads input from a socket. This is used mainly for testing and debugging.
- * */
-public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> extends UnboundedSource<String, C> {
-
-  private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
-
-  private static final long serialVersionUID = 1L;
-
-  private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
-
-  private static final int CONNECTION_TIMEOUT_TIME = 0;
-
-  private final String hostname;
-  private final int port;
-  private final char delimiter;
-  private final long maxNumRetries;
-  private final long delayBetweenRetries;
-
-  public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) {
-    this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
-  }
-
-  public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
-    this.hostname = hostname;
-    this.port = port;
-    this.delimiter = delimiter;
-    this.maxNumRetries = maxNumRetries;
-    this.delayBetweenRetries = delayBetweenRetries;
-  }
-
-  public String getHostname() {
-    return this.hostname;
-  }
-
-  public int getPort() {
-    return this.port;
-  }
-
-  public char getDelimiter() {
-    return this.delimiter;
-  }
-
-  public long getMaxNumRetries() {
-    return this.maxNumRetries;
-  }
-
-  public long getDelayBetweenRetries() {
-    return this.delayBetweenRetries;
-  }
-
-  @Override
-  public List<? extends UnboundedSource<String, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
-    return Collections.<UnboundedSource<String, C>>singletonList(this);
-  }
-
-  @Override
-  public UnboundedReader<String> createReader(PipelineOptions options, @Nullable C checkpointMark) {
-    return new UnboundedSocketReader(this);
-  }
-
-  @Nullable
-  @Override
-  public Coder getCheckpointMarkCoder() {
-    // Flink and Dataflow have different checkpointing mechanisms.
-    // In our case we do not need a coder.
-    return null;
-  }
-
-  @Override
-  public void validate() {
-    checkArgument(port > 0 && port < 65536, "port is out of range");
-    checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
-    checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
-  }
-
-  @Override
-  public Coder getDefaultOutputCoder() {
-    return DEFAULT_SOCKET_CODER;
-  }
-
-  public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> implements Serializable {
-
-    private static final long serialVersionUID = 7526472295622776147L;
-    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
-
-    private final UnboundedSocketSource source;
-
-    private Socket socket;
-    private BufferedReader reader;
-
-    private boolean isRunning;
-
-    private String currentRecord;
-
-    public UnboundedSocketReader(UnboundedSocketSource source) {
-      this.source = source;
-    }
-
-    private void openConnection() throws IOException {
-      this.socket = new Socket();
-      this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME);
-      this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
-      this.isRunning = true;
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      int attempt = 0;
-      while (!isRunning) {
-        try {
-          openConnection();
-          LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort());
-
-          return advance();
-        } catch (IOException e) {
-          LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs...");
-
-          if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) {
-            try {
-              Thread.sleep(this.source.getDelayBetweenRetries());
-            } catch (InterruptedException e1) {
-              e1.printStackTrace();
-            }
-          } else {
-            this.isRunning = false;
-            break;
-          }
-        }
-      }
-      LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort());
-      return false;
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      final StringBuilder buffer = new StringBuilder();
-      int data;
-      while (isRunning && (data = reader.read()) != -1) {
-        // check if the string is complete
-        if (data != this.source.getDelimiter()) {
-          buffer.append((char) data);
-        } else {
-          if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
-            buffer.setLength(buffer.length() - 1);
-          }
-          this.currentRecord = buffer.toString();
-          buffer.setLength(0);
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public byte[] getCurrentRecordId() throws NoSuchElementException {
-      return new byte[0];
-    }
-
-    @Override
-    public String getCurrent() throws NoSuchElementException {
-      return this.currentRecord;
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-      return Instant.now();
-    }
-
-    @Override
-    public void close() throws IOException {
-      this.reader.close();
-      this.socket.close();
-      this.isRunning = false;
-      LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + ".");
-    }
-
-    @Override
-    public Instant getWatermark() {
-      return Instant.now();
-    }
-
-    @Override
-    public CheckpointMark getCheckpointMark() {
-      return null;
-    }
-
-    @Override
-    public UnboundedSource<String, ?> getCurrentSource() {
-      return this.source;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
deleted file mode 100644
index 7c1ccdf..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.joda.time.Instant;
-
-/**
- * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded}
- * interface.
- *
- *</p>
- * For now we support non-parallel, not checkpointed sources.
- * */
-public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable {
-
-  private final String name;
-  private final UnboundedSource.UnboundedReader<T> reader;
-
-  private StreamingRuntimeContext runtime = null;
-  private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;
-
-  private volatile boolean isRunning = false;
-
-  public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) {
-    this.name = transform.getName();
-    this.reader = transform.getSource().createReader(options, null);
-  }
-
-  public String getName() {
-    return this.name;
-  }
-
-  WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
-    if (timestamp == null) {
-      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-    }
-    return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
-  }
-
-  @Override
-  public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
-    if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
-      throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
-          "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
-    }
-
-    context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
-    runtime = (StreamingRuntimeContext) getRuntimeContext();
-
-    this.isRunning = true;
-    boolean inputAvailable = reader.start();
-
-    setNextWatermarkTimer(this.runtime);
-
-    while (isRunning) {
-
-      while (!inputAvailable && isRunning) {
-        // wait a bit until we retry to pull more records
-        Thread.sleep(50);
-        inputAvailable = reader.advance();
-      }
-
-      if (inputAvailable) {
-
-        // get it and its timestamp from the source
-        T item = reader.getCurrent();
-        Instant timestamp = reader.getCurrentTimestamp();
-
-        // write it to the output collector
-        synchronized (ctx.getCheckpointLock()) {
-          context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
-        }
-
-        inputAvailable = reader.advance();
-      }
-
-    }
-  }
-
-  @Override
-  public void cancel() {
-    isRunning = false;
-  }
-
-  @Override
-  public void trigger(long timestamp) throws Exception {
-    if (this.isRunning) {
-      synchronized (context.getCheckpointLock()) {
-        long watermarkMillis = this.reader.getWatermark().getMillis();
-        context.emitWatermark(new Watermark(watermarkMillis));
-      }
-      setNextWatermarkTimer(this.runtime);
-    }
-  }
-
-  private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
-    if (this.isRunning) {
-      long watermarkInterval =  runtime.getExecutionConfig().getAutoWatermarkInterval();
-      long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval);
-      runtime.registerTimer(timeToNextWatermark, this);
-    }
-  }
-
-  private long getTimeToNextWaternark(long watermarkInterval) {
-    return System.currentTimeMillis() + watermarkInterval;
-  }
-}


Mime
View raw message