beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [31/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples package
Date Wed, 19 Apr 2017 19:15:05 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
new file mode 100644
index 0000000..ba00036
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class that instantiates and manages the execution of a given job.
+ * Depending on if the job is a Streaming or Batch processing one, it creates
+ * the adequate execution environment ({@link ExecutionEnvironment}
+ * or {@link StreamExecutionEnvironment}), the necessary {@link FlinkPipelineTranslator}
+ * ({@link FlinkBatchPipelineTranslator} or {@link FlinkStreamingPipelineTranslator}) to
+ * transform the Beam job into a Flink one, and executes the (translated) job.
+ */
+class FlinkPipelineExecutionEnvironment {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
+
+  private final FlinkPipelineOptions options;
+
+  /**
+   * The Flink Batch execution environment. This is instantiated to either a
+   * {@link org.apache.flink.api.java.CollectionEnvironment},
+   * a {@link org.apache.flink.api.java.LocalEnvironment} or
+   * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
+   * options.
+   */
+  private ExecutionEnvironment flinkBatchEnv;
+
+  /**
+   * The Flink Streaming execution environment. This is instantiated to either a
+   * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
+   * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
+   * on the configuration options, and more specifically, the url of the master.
+   */
+  private StreamExecutionEnvironment flinkStreamEnv;
+
+  /**
+   * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters
in the
+   * provided {@link FlinkPipelineOptions}.
+   *
+   * @param options the user-defined pipeline options.
+   * */
+  FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
+    this.options = checkNotNull(options);
+  }
+
+  /**
+   * Depending on if the job is a Streaming or a Batch one, this method creates
+   * the necessary execution environment and pipeline translator, and translates
+   * the {@link org.apache.beam.sdk.values.PCollection} program into
+   * a {@link org.apache.flink.api.java.DataSet}
+   * or {@link org.apache.flink.streaming.api.datastream.DataStream} one.
+   * */
+  public void translate(FlinkRunner flinkRunner, Pipeline pipeline) {
+    this.flinkBatchEnv = null;
+    this.flinkStreamEnv = null;
+
+    PipelineTranslationOptimizer optimizer =
+        new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
+
+    optimizer.translate(pipeline);
+    TranslationMode translationMode = optimizer.getTranslationMode();
+
+    FlinkPipelineTranslator translator;
+    if (translationMode == TranslationMode.STREAMING) {
+      this.flinkStreamEnv = createStreamExecutionEnvironment();
+      translator = new FlinkStreamingPipelineTranslator(flinkRunner, flinkStreamEnv, options);
+    } else {
+      this.flinkBatchEnv = createBatchExecutionEnvironment();
+      translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
+    }
+
+    translator.translate(pipeline);
+  }
+
+  /**
+   * Launches the program execution.
+   * */
+  public JobExecutionResult executePipeline() throws Exception {
+    final String jobName = options.getJobName();
+
+    if (flinkBatchEnv != null) {
+      return flinkBatchEnv.execute(jobName);
+    } else if (flinkStreamEnv != null) {
+      return flinkStreamEnv.execute(jobName);
+    } else {
+      throw new IllegalStateException("The Pipeline has not yet been translated.");
+    }
+  }
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the adequate
+   * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
+   * on the user-specified options.
+   */
+  private ExecutionEnvironment createBatchExecutionEnvironment() {
+
+    LOG.info("Creating the required Batch Execution Environment.");
+
+    String masterUrl = options.getFlinkMaster();
+    ExecutionEnvironment flinkBatchEnv;
+
+    // depending on the master, create the right environment.
+    if (masterUrl.equals("[local]")) {
+      flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+    } else if (masterUrl.equals("[collection]")) {
+      flinkBatchEnv = new CollectionEnvironment();
+    } else if (masterUrl.equals("[auto]")) {
+      flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+    } else if (masterUrl.matches(".*:\\d*")) {
+      String[] parts = masterUrl.split(":");
+      List<String> stagingFiles = options.getFilesToStage();
+      flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
+          Integer.parseInt(parts[1]),
+          stagingFiles.toArray(new String[stagingFiles.size()]));
+    } else {
+      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+      flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    // set the correct parallelism.
+    if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment))
{
+      flinkBatchEnv.setParallelism(options.getParallelism());
+    }
+
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(flinkBatchEnv.getParallelism());
+
+    if (options.getObjectReuse()) {
+      flinkBatchEnv.getConfig().enableObjectReuse();
+    } else {
+      flinkBatchEnv.getConfig().disableObjectReuse();
+    }
+
+    return flinkBatchEnv;
+  }
+
+  /**
+   * If the submitted job is a stream processing job, this method creates the adequate
+   * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}
depending
+   * on the user-specified options.
+   */
+  private StreamExecutionEnvironment createStreamExecutionEnvironment() {
+
+    LOG.info("Creating the required Streaming Environment.");
+
+    String masterUrl = options.getFlinkMaster();
+    StreamExecutionEnvironment flinkStreamEnv = null;
+
+    // depending on the master, create the right environment.
+    if (masterUrl.equals("[local]")) {
+      flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+    } else if (masterUrl.equals("[auto]")) {
+      flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+    } else if (masterUrl.matches(".*:\\d*")) {
+      String[] parts = masterUrl.split(":");
+      List<String> stagingFiles = options.getFilesToStage();
+      flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+          Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
+    } else {
+      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+      flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    // set the correct parallelism.
+    if (options.getParallelism() != -1) {
+      flinkStreamEnv.setParallelism(options.getParallelism());
+    }
+
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(flinkStreamEnv.getParallelism());
+
+    if (options.getObjectReuse()) {
+      flinkStreamEnv.getConfig().enableObjectReuse();
+    } else {
+      flinkStreamEnv.getConfig().disableObjectReuse();
+    }
+
+    // default to event time
+    flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+    // for the following 2 parameters, a value of -1 means that Flink will use
+    // the default values as specified in the configuration.
+    int numRetries = options.getNumberOfExecutionRetries();
+    if (numRetries != -1) {
+      flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
+    }
+    long retryDelay = options.getExecutionRetryDelay();
+    if (retryDelay != -1) {
+      flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
+    }
+
+    // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink).
+    // If the value is not -1, then the validity checks are applied.
+    // By default, checkpointing is disabled.
+    long checkpointInterval = options.getCheckpointingInterval();
+    if (checkpointInterval != -1) {
+      if (checkpointInterval < 1) {
+        throw new IllegalArgumentException("The checkpoint interval must be positive");
+      }
+      flinkStreamEnv.enableCheckpointing(checkpointInterval);
+    }
+
+    // State backend
+    final AbstractStateBackend stateBackend = options.getStateBackend();
+    if (stateBackend != null) {
+      flinkStreamEnv.setStateBackend(stateBackend);
+    }
+
+    return flinkStreamEnv;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
new file mode 100644
index 0000000..ef9afea
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.List;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+/**
+ * Options which can be used to configure a Flink PipelineRunner.
+ */
+public interface FlinkPipelineOptions
+    extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
+
+  /**
+   * List of local files to make available to workers.
+   *
+   * <p>Jars are placed on the worker's classpath.
+   *
+   * <p>The default value is the list of jars from the main program's classpath.
+   */
+  @Description("Jar-Files to send to all workers and put on the classpath. "
+      + "The default value is all files from the classpath.")
+  @JsonIgnore
+  List<String> getFilesToStage();
+  void setFilesToStage(List<String> value);
+
+  /**
+   * The url of the Flink JobManager on which to execute pipelines. This can either be
+   * the the address of a cluster JobManager, in the form "host:port" or one of the special
+   * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink
+   * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while
+   * "[auto]" will let the system decide where to execute the pipeline based on the environment.
+   */
+  @Description("Address of the Flink Master where the Pipeline should be executed. Can"
+      + " either be of the form \"host:port\" or one of the special values [local], "
+      + "[collection] or [auto].")
+  String getFlinkMaster();
+  void setFlinkMaster(String value);
+
+  @Description("The degree of parallelism to be used when distributing operations onto workers.")
+  @Default.InstanceFactory(DefaultParallelismFactory.class)
+  Integer getParallelism();
+  void setParallelism(Integer value);
+
+  @Description("The interval between consecutive checkpoints (i.e. snapshots of the current"
+      + "pipeline state used for fault tolerance).")
+  @Default.Long(-1L)
+  Long getCheckpointingInterval();
+  void setCheckpointingInterval(Long interval);
+
+  @Description("Sets the number of times that failed tasks are re-executed. "
+      + "A value of zero effectively disables fault tolerance. A value of -1 indicates "
+      + "that the system default value (as defined in the configuration) should be used.")
+  @Default.Integer(-1)
+  Integer getNumberOfExecutionRetries();
+  void setNumberOfExecutionRetries(Integer retries);
+
+  @Description("Sets the delay between executions. A value of {@code -1} "
+      + "indicates that the default value should be used.")
+  @Default.Long(-1L)
+  Long getExecutionRetryDelay();
+  void setExecutionRetryDelay(Long delay);
+
+  @Description("Sets the behavior of reusing objects.")
+  @Default.Boolean(false)
+  Boolean getObjectReuse();
+  void setObjectReuse(Boolean reuse);
+
+  /**
+   * State backend to store Beam's state during computation.
+   * Note: Only applicable when executing in streaming mode.
+   */
+  @Description("Sets the state backend to use in streaming mode. "
+      + "Otherwise the default is read from the Flink config.")
+  @JsonIgnore
+  AbstractStateBackend getStateBackend();
+  void setStateBackend(AbstractStateBackend stateBackend);
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java
new file mode 100644
index 0000000..65f416d
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+
+/**
+ * The role of this class is to translate the Beam operators to
+ * their Flink counterparts. If we have a streaming job, this is instantiated as a
+ * {@link FlinkStreamingPipelineTranslator}. In other case, i.e. for a batch job,
+ * a {@link FlinkBatchPipelineTranslator} is created. Correspondingly, the
+ * {@link org.apache.beam.sdk.values.PCollection}-based user-provided job is translated into
+ * a {@link org.apache.flink.streaming.api.datastream.DataStream} (for streaming) or a
+ * {@link org.apache.flink.api.java.DataSet} (for batch) one.
+ */
+abstract class FlinkPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
+
+  /**
+   * Translates the pipeline by passing this class as a visitor.
+   * @param pipeline The pipeline to be translated
+   */
+  public void translate(Pipeline pipeline) {
+    pipeline.traverseTopologically(this);
+  }
+
+  /**
+   * Utility formatting method.
+   * @param n number of spaces to generate
+   * @return String with "|" followed by n spaces
+   */
+  protected static String genSpaces(int n) {
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < n; i++) {
+      builder.append("|   ");
+    }
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
new file mode 100644
index 0000000..096f030
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.base.Joiner;
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.DetachedEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the
+ * pipeline by first translating them to a Flink Plan and then executing them either locally
+ * or on a Flink cluster, depending on the configuration.
+ */
+public class FlinkRunner extends PipelineRunner<PipelineResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkRunner.class);
+
+  /**
+   * Provided options.
+   */
+  private final FlinkPipelineOptions options;
+
+  /**
+   * Construct a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static FlinkRunner fromOptions(PipelineOptions options) {
+    FlinkPipelineOptions flinkOptions =
+        PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+    ArrayList<String> missing = new ArrayList<>();
+
+    if (flinkOptions.getAppName() == null) {
+      missing.add("appName");
+    }
+    if (missing.size() > 0) {
+      throw new IllegalArgumentException(
+          "Missing required values: " + Joiner.on(',').join(missing));
+    }
+
+    if (flinkOptions.getFilesToStage() == null) {
+      flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
+          FlinkRunner.class.getClassLoader()));
+      LOG.info("PipelineOptions.filesToStage was not specified. "
+              + "Defaulting to files from the classpath: will stage {} files. "
+              + "Enable logging at DEBUG level to see which files will be staged.",
+          flinkOptions.getFilesToStage().size());
+      LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
+    }
+
+    // Set Flink Master to [auto] if no option was specified.
+    if (flinkOptions.getFlinkMaster() == null) {
+      flinkOptions.setFlinkMaster("[auto]");
+    }
+
+    return new FlinkRunner(flinkOptions);
+  }
+
+  private FlinkRunner(FlinkPipelineOptions options) {
+    this.options = options;
+    this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+    logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
+
+    LOG.info("Executing pipeline using FlinkRunner.");
+
+    FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);
+
+    LOG.info("Translating pipeline to Flink program.");
+    env.translate(this, pipeline);
+
+    JobExecutionResult result;
+    try {
+      LOG.info("Starting execution of Flink program.");
+      result = env.executePipeline();
+    } catch (Exception e) {
+      LOG.error("Pipeline execution failed", e);
+      throw new RuntimeException("Pipeline execution failed", e);
+    }
+
+    if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) {
+      LOG.info("Pipeline submitted in Detached mode");
+      return new FlinkDetachedRunnerResult();
+    } else {
+      LOG.info("Execution finished in {} msecs", result.getNetRuntime());
+      Map<String, Object> accumulators = result.getAllAccumulatorResults();
+      if (accumulators != null && !accumulators.isEmpty()) {
+        LOG.info("Final aggregator values:");
+
+        for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet())
{
+          LOG.info("{} : {}", entry.getKey(), entry.getValue());
+        }
+      }
+
+      return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+    }
+  }
+
+  /**
+   * For testing.
+   */
+  public FlinkPipelineOptions getPipelineOptions() {
+    return options;
+  }
+
+  @Override
+  public String toString() {
+    return "FlinkRunner#" + hashCode();
+  }
+
+  /**
+   * Attempts to detect all the resources the class loader has access to. This does not recurse
+   * to class loader parents stopping it from pulling in resources from the system class
loader.
+   *
+   * @param classLoader The URLClassLoader to use to detect resources to stage.
+   * @return A list of absolute paths to the resources the class loader uses.
+   * @throws IllegalArgumentException If either the class loader is not a URLClassLoader
or one
+   *   of the resources the class loader exposes is not a file resource.
+   */
+  protected static List<String> detectClassPathResourcesToStage(
+      ClassLoader classLoader) {
+    if (!(classLoader instanceof URLClassLoader)) {
+      String message = String.format("Unable to use ClassLoader to detect classpath elements.
"
+          + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
+      LOG.error(message);
+      throw new IllegalArgumentException(message);
+    }
+
+    List<String> files = new ArrayList<>();
+    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
+      try {
+        files.add(new File(url.toURI()).getAbsolutePath());
+      } catch (IllegalArgumentException | URISyntaxException e) {
+        String message = String.format("Unable to convert url (%s) to file.", url);
+        LOG.error(message);
+        throw new IllegalArgumentException(message, e);
+      }
+    }
+    return files;
+  }
+
+  /** A set of {@link View}s with non-deterministic key coders. */
+  Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;
+
+  /**
+   * Records that the {@link PTransform} requires a deterministic key coder.
+   */
+  void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
+    ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
+  }
+
+  /** Outputs a warning about PCollection views without deterministic key coders. */
+  private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline)
{
+    // We need to wait till this point to determine the names of the transforms since only
+    // at this time do we know the hierarchy of the transforms otherwise we could
+    // have just recorded the full names during apply time.
+    if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
+      final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new
TreeSet<>();
+      pipeline.traverseTopologically(new Pipeline.PipelineVisitor() {
+        @Override
+        public void visitValue(PValue value, TransformHierarchy.Node producer) {
+        }
+
+        @Override
+        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+          if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform()))
{
+            ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+          }
+        }
+
+        @Override
+        public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+          if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform()))
{
+            ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+          }
+          return CompositeBehavior.ENTER_TRANSFORM;
+        }
+
+        @Override
+        public void leaveCompositeTransform(TransformHierarchy.Node node) {
+        }
+      });
+
+      LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for
{} "
+          + "because the key coder is not deterministic. Falling back to singleton implementation
"
+          + "which may cause memory and/or performance problems. Future major versions of
"
+          + "the Flink runner will require deterministic key coders.",
+          ptransformViewNamesWithNonDeterministicKeyCoders);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
new file mode 100644
index 0000000..681459a
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+
+/**
+ * AutoService registrar - will register FlinkRunner and FlinkOptions
+ * as possible pipeline runner services.
+ *
+ * <p>It ends up in META-INF/services and gets picked up by Beam.
+ *
+ */
+public class FlinkRunnerRegistrar {
+  private FlinkRunnerRegistrar() { }
+
+  /**
+   * Pipeline runner registrar.
+   */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class Runner implements PipelineRunnerRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners()
{
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+          FlinkRunner.class,
+          TestFlinkRunner.class);
+    }
+  }
+
+  /**
+   * Pipeline options registrar.
+   */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class Options implements PipelineOptionsRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.<Class<? extends PipelineOptions>>of(FlinkPipelineOptions.class);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
new file mode 100644
index 0000000..0682b56
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.joda.time.Duration;
+
+/**
+ * Result of executing a {@link org.apache.beam.sdk.Pipeline} with Flink. This
+ * has methods to query to job runtime and the final values of
+ * {@link org.apache.beam.sdk.transforms.Aggregator}s.
+ */
+public class FlinkRunnerResult implements PipelineResult {
+
+  private final Map<String, Object> aggregators;
+
+  private final long runtime;
+
+  FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
+    this.aggregators = (aggregators == null || aggregators.isEmpty())
+        ? Collections.<String, Object>emptyMap()
+        : Collections.unmodifiableMap(aggregators);
+    this.runtime = runtime;
+  }
+
+  @Override
+  public State getState() {
+    return State.DONE;
+  }
+
+  @Override
+  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T>
aggregator)
+      throws AggregatorRetrievalException {
+    // TODO provide a list of all accumulator step values
+    Object value = aggregators.get(aggregator.getName());
+    if (value != null) {
+      return new AggregatorValues<T>() {
+        @Override
+        public Map<String, T> getValuesAtSteps() {
+          return (Map<String, T>) aggregators;
+        }
+      };
+    } else {
+      throw new AggregatorRetrievalException("Accumulator results not found.",
+          new RuntimeException("Accumulator does not exist."));
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "FlinkRunnerResult{"
+        + "aggregators=" + aggregators
+        + ", runtime=" + runtime
+        + '}';
+  }
+
+  @Override
+  public State cancel() throws IOException {
+    throw new UnsupportedOperationException("FlinkRunnerResult does not support cancel.");
+  }
+
+  @Override
+  public State waitUntilFinish() {
+    return State.DONE;
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+    return State.DONE;
+  }
+
+  @Override
+  public MetricResults metrics() {
+    throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
new file mode 100644
index 0000000..0459ef7
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformReplacements;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate
+ * the user-provided {@link org.apache.beam.sdk.values.PCollection}-based job into a
+ * {@link org.apache.flink.streaming.api.datastream.DataStream} one.
+ *
+ */
+class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
+
+  /** The necessary context in the case of a straming job. */
+  private final FlinkStreamingTranslationContext streamingContext;
+
+  private int depth = 0;
+
+  private FlinkRunner flinkRunner;
+
+  public FlinkStreamingPipelineTranslator(
+      FlinkRunner flinkRunner,
+      StreamExecutionEnvironment env,
+      PipelineOptions options) {
+    this.streamingContext = new FlinkStreamingTranslationContext(env, options);
+    this.flinkRunner = flinkRunner;
+  }
+
+  @Override
+  public void translate(Pipeline pipeline) {
+    List<PTransformOverride> transformOverrides =
+        ImmutableList.<PTransformOverride>builder()
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.splittableParDoMulti(),
+                    new SplittableParDoOverrideFactory()))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsIterable.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner)))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsList.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner)))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsMap.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner)))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsMultimap.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner)))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner)))
+            // this has to be last since the ViewAsSingleton override
+            // can expand to a Combine.GloballyAsSingletonView
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class,
+                        flinkRunner)))
+            .build();
+
+    pipeline.replaceAll(transformOverrides);
+    super.translate(pipeline);
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Pipeline Visitor Methods
+  // --------------------------------------------------------------------------------------------
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+    LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
+    this.depth++;
+
+    PTransform<?, ?> transform = node.getTransform();
+    if (transform != null) {
+      StreamTransformTranslator<?> translator =
+          FlinkStreamingTransformTranslators.getTranslator(transform);
+
+      if (translator != null && applyCanTranslate(transform, node, translator)) {
+        applyStreamingTransform(transform, node, translator);
+        LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
+        return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+      }
+    }
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
+    this.depth--;
+    LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
+  }
+
+  @Override
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
+    // get the transformation corresponding to hte node we are
+    // currently visiting and translate it into its Flink alternative.
+
+    PTransform<?, ?> transform = node.getTransform();
+    StreamTransformTranslator<?> translator =
+        FlinkStreamingTransformTranslators.getTranslator(transform);
+
+    if (translator == null || !applyCanTranslate(transform, node, translator)) {
+      LOG.info(node.getTransform().getClass().toString());
+      throw new UnsupportedOperationException(
+          "The transform " + transform + " is currently not supported.");
+    }
+    applyStreamingTransform(transform, node, translator);
+  }
+
+  @Override
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {
+    // do nothing here
+  }
+
+  private <T extends PTransform<?, ?>> void applyStreamingTransform(
+      PTransform<?, ?> transform,
+      TransformHierarchy.Node node,
+      StreamTransformTranslator<?> translator) {
+
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+
+    @SuppressWarnings("unchecked")
+    StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>)
translator;
+
+    // create the applied PTransform on the streamingContext
+    streamingContext.setCurrentTransform(node.toAppliedPTransform());
+    typedTranslator.translateNode(typedTransform, streamingContext);
+  }
+
+  private <T extends PTransform<?, ?>> boolean applyCanTranslate(
+      PTransform<?, ?> transform,
+      TransformHierarchy.Node node,
+      StreamTransformTranslator<?> translator) {
+
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+
+    @SuppressWarnings("unchecked")
+    StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>)
translator;
+
+    streamingContext.setCurrentTransform(node.toAppliedPTransform());
+
+    return typedTranslator.canTranslate(typedTransform, streamingContext);
+  }
+
+  /**
+   * The interface that every Flink translator of a Beam operator should implement.
+   * This interface is for <b>streaming</b> jobs. For examples of such translators
see
+   * {@link FlinkStreamingTransformTranslators}.
+   */
+  abstract static class StreamTransformTranslator<T extends PTransform> {
+
+    /**
+     * Translate the given transform.
+     */
+    abstract void translateNode(T transform, FlinkStreamingTranslationContext context);
+
+    /**
+     * Returns true iff this translator can translate the given transform.
+     */
+    boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
+      return true;
+    }
+  }
+
+  private static class ReflectiveOneToOneOverrideFactory<
+          InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
+      extends SingleInputOutputOverrideFactory<
+          PCollection<InputT>, PCollection<OutputT>, TransformT> {
+    private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>
replacement;
+    private final FlinkRunner runner;
+
+    private ReflectiveOneToOneOverrideFactory(
+        Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>
replacement,
+        FlinkRunner runner) {
+      this.replacement = replacement;
+      this.runner = runner;
+    }
+
+    @Override
+    public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>>
getReplacementTransform(
+        AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT>
transform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          InstanceBuilder.ofType(replacement)
+              .withArg(FlinkRunner.class, runner)
+              .withArg(
+                  (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>)
+                      transform.getTransform().getClass(),
+                  transform.getTransform())
+              .build());
+    }
+  }
+
+  /**
+   * A {@link PTransformOverrideFactory} that overrides a <a
+   * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with {@link
SplittableParDo}.
+   */
+  static class SplittableParDoOverrideFactory<InputT, OutputT>
+      implements PTransformOverrideFactory<
+          PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
{
+    @Override
+    public PTransformReplacement<PCollection<InputT>, PCollectionTuple>
+        getReplacementTransform(
+            AppliedPTransform<
+                    PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
+                transform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          new SplittableParDo<>(transform.getTransform()));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
+      return ReplacementOutputs.tagged(outputs, newOutput);
+    }
+  }
+}


Mime
View raw message