Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D4267196B8 for ; Thu, 14 Apr 2016 04:48:35 +0000 (UTC) Received: (qmail 53474 invoked by uid 500); 14 Apr 2016 04:48:35 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 53426 invoked by uid 500); 14 Apr 2016 04:48:35 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 53417 invoked by uid 99); 14 Apr 2016 04:48:35 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Apr 2016 04:48:35 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 220311801DA for ; Thu, 14 Apr 2016 04:48:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id cdLZrVMp0w4X for ; Thu, 14 Apr 2016 04:48:11 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 7323760E6B for ; Thu, 14 Apr 2016 04:47:50 +0000 (UTC) Received: (qmail 47663 invoked by uid 99); 14 Apr 2016 04:47:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Apr 2016 04:47:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 23F7CE09C7; Thu, 14 Apr 2016 04:47:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davor@apache.org To: commits@beam.incubator.apache.org Date: Thu, 14 Apr 2016 04:48:22 -0000 Message-Id: <01e6c5b4b0bc4c56b2c895a66914a381@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [35/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java deleted file mode 100644 index 8f8c653..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.runners; - -import org.apache.beam.sdk.annotations.Experimental; - -import com.google.api.services.dataflow.model.Environment; - -/** - * An instance of this class can be passed to the - * {@link DataflowPipelineRunner} to add user defined hooks to be - * invoked at various times during pipeline execution. - */ -@Experimental -public class DataflowPipelineRunnerHooks { - /** - * Allows the user to modify the environment of their job before their job is submitted - * to the service for execution. - * - * @param environment The environment of the job. Users can make change to this instance in order - * to change the environment with which their job executes on the service. - */ - public void modifyEnvironmentBeforeSubmission(Environment environment) {} -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java deleted file mode 100644 index fa7067e..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java +++ /dev/null @@ -1,1100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.runners; - -import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray; -import static com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray; -import static com.google.cloud.dataflow.sdk.util.StringUtils.byteArrayToJsonString; -import static com.google.cloud.dataflow.sdk.util.StringUtils.jsonStringToByteArray; -import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean; -import static com.google.cloud.dataflow.sdk.util.Structs.addDictionary; -import static com.google.cloud.dataflow.sdk.util.Structs.addList; -import static com.google.cloud.dataflow.sdk.util.Structs.addLong; -import static com.google.cloud.dataflow.sdk.util.Structs.addObject; -import static com.google.cloud.dataflow.sdk.util.Structs.addString; -import static com.google.cloud.dataflow.sdk.util.Structs.getString; -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.api.services.dataflow.model.AutoscalingSettings; -import com.google.api.services.dataflow.model.DataflowPackage; -import com.google.api.services.dataflow.model.Disk; -import com.google.api.services.dataflow.model.Environment; -import com.google.api.services.dataflow.model.Job; -import com.google.api.services.dataflow.model.Step; -import com.google.api.services.dataflow.model.WorkerPool; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.CoderException; -import com.google.cloud.dataflow.sdk.coders.IterableCoder; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.io.PubsubIO; -import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.StreamingOptions; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly; -import com.google.cloud.dataflow.sdk.runners.dataflow.BigQueryIOTranslator; -import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator; -import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.Flatten; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.View; -import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; -import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.util.AppliedCombineFn; -import com.google.cloud.dataflow.sdk.util.CloudObject; -import com.google.cloud.dataflow.sdk.util.DoFnInfo; -import com.google.cloud.dataflow.sdk.util.OutputReference; -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.cloud.dataflow.sdk.util.SerializableUtils; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionTuple; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.POutput; -import com.google.cloud.dataflow.sdk.values.PValue; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.cloud.dataflow.sdk.values.TypedPValue; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import javax.annotation.Nullable; - -/** - * {@link DataflowPipelineTranslator} knows how to translate {@link Pipeline} objects - * into Cloud Dataflow Service API {@link Job}s. - */ -@SuppressWarnings({"rawtypes", "unchecked"}) -public class DataflowPipelineTranslator { - // Must be kept in sync with their internal counterparts. - private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class); - private static final ObjectMapper MAPPER = new ObjectMapper(); - - /** - * A map from {@link PTransform} subclass to the corresponding - * {@link TransformTranslator} to use to translate that transform. - * - *

A static map that contains system-wide defaults. - */ - private static Map transformTranslators = - new HashMap<>(); - - /** Provided configuration options. */ - private final DataflowPipelineOptions options; - - /** - * Constructs a translator from the provided options. - * - * @param options Properties that configure the translator. - * - * @return The newly created translator. - */ - public static DataflowPipelineTranslator fromOptions( - DataflowPipelineOptions options) { - return new DataflowPipelineTranslator(options); - } - - private DataflowPipelineTranslator(DataflowPipelineOptions options) { - this.options = options; - } - - /** - * Translates a {@link Pipeline} into a {@code JobSpecification}. - */ - public JobSpecification translate( - Pipeline pipeline, - DataflowPipelineRunner runner, - List packages) { - - Translator translator = new Translator(pipeline, runner); - Job result = translator.translate(packages); - return new JobSpecification(result, Collections.unmodifiableMap(translator.stepNames)); - } - - /** - * The result of a job translation. - * - *

Used to pass the result {@link Job} and any state that was used to construct the job that - * may be of use to other classes (eg the {@link PTransform} to StepName mapping). - */ - public static class JobSpecification { - private final Job job; - private final Map, String> stepNames; - - public JobSpecification(Job job, Map, String> stepNames) { - this.job = job; - this.stepNames = stepNames; - } - - public Job getJob() { - return job; - } - - /** - * Returns the mapping of {@link AppliedPTransform AppliedPTransforms} to the internal step - * name for that {@code AppliedPTransform}. - */ - public Map, String> getStepNames() { - return stepNames; - } - } - - /** - * Renders a {@link Job} as a string. - */ - public static String jobToString(Job job) { - try { - return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(job); - } catch (JsonProcessingException exc) { - throw new IllegalStateException("Failed to render Job as String.", exc); - } - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Records that instances of the specified PTransform class - * should be translated by default by the corresponding - * {@link TransformTranslator}. - */ - public static void registerTransformTranslator( - Class transformClass, - TransformTranslator transformTranslator) { - if (transformTranslators.put(transformClass, transformTranslator) != null) { - throw new IllegalArgumentException( - "defining multiple translators for " + transformClass); - } - } - - /** - * Returns the {@link TransformTranslator} to use for instances of the - * specified PTransform class, or null if none registered. - */ - public - TransformTranslator getTransformTranslator(Class transformClass) { - return transformTranslators.get(transformClass); - } - - /** - * A {@link TransformTranslator} knows how to translate - * a particular subclass of {@link PTransform} for the - * Cloud Dataflow service. It does so by - * mutating the {@link TranslationContext}. - */ - public interface TransformTranslator { - public void translate(TransformT transform, - TranslationContext context); - } - - /** - * The interface provided to registered callbacks for interacting - * with the {@link DataflowPipelineRunner}, including reading and writing the - * values of {@link PCollection}s and side inputs ({@link PCollectionView}s). - */ - public interface TranslationContext { - /** - * Returns the configured pipeline options. - */ - DataflowPipelineOptions getPipelineOptions(); - - /** - * Returns the input of the currently being translated transform. - */ - InputT getInput(PTransform transform); - - /** - * Returns the output of the currently being translated transform. - */ - OutputT getOutput(PTransform transform); - - /** - * Returns the full name of the currently being translated transform. - */ - String getFullName(PTransform transform); - - /** - * Adds a step to the Dataflow workflow for the given transform, with - * the given Dataflow step type. - * This step becomes "current" for the purpose of {@link #addInput} and - * {@link #addOutput}. - */ - public void addStep(PTransform transform, String type); - - /** - * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be - * consistent with the Step, in terms of input, output and coder types. - * - *

This is a low-level operation, when using this method it is up to - * the caller to ensure that names do not collide. - */ - public void addStep(PTransform transform, Step step); - - /** - * Sets the encoding for the current Dataflow step. - */ - public void addEncodingInput(Coder value); - - /** - * Adds an input with the given name and value to the current - * Dataflow step. - */ - public void addInput(String name, Boolean value); - - /** - * Adds an input with the given name and value to the current - * Dataflow step. - */ - public void addInput(String name, String value); - - /** - * Adds an input with the given name and value to the current - * Dataflow step. - */ - public void addInput(String name, Long value); - - /** - * Adds an input with the given name to the previously added Dataflow - * step, coming from the specified input PValue. - */ - public void addInput(String name, PInput value); - - /** - * Adds an input that is a dictionary of strings to objects. - */ - public void addInput(String name, Map elements); - - /** - * Adds an input that is a list of objects. - */ - public void addInput(String name, List> elements); - - /** - * Adds an output with the given name to the previously added - * Dataflow step, producing the specified output {@code PValue}, - * including its {@code Coder} if a {@code TypedPValue}. If the - * {@code PValue} is a {@code PCollection}, wraps its coder inside - * a {@code WindowedValueCoder}. - */ - public void addOutput(String name, PValue value); - - /** - * Adds an output with the given name to the previously added - * Dataflow step, producing the specified output {@code PValue}, - * including its {@code Coder} if a {@code TypedPValue}. If the - * {@code PValue} is a {@code PCollection}, wraps its coder inside - * a {@code ValueOnlyCoder}. - */ - public void addValueOnlyOutput(String name, PValue value); - - /** - * Adds an output with the given name to the previously added - * CollectionToSingleton Dataflow step, consuming the specified - * input {@code PValue} and producing the specified output - * {@code PValue}. This step requires special treatment for its - * output encoding. - */ - public void addCollectionToSingletonOutput(String name, - PValue inputValue, - PValue outputValue); - - /** - * Encode a PValue reference as an output reference. - */ - public OutputReference asOutputReference(PValue value); - } - - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Translates a Pipeline into the Dataflow representation. - */ - class Translator implements PipelineVisitor, TranslationContext { - /** The Pipeline to translate. */ - private final Pipeline pipeline; - - /** The runner which will execute the pipeline. */ - private final DataflowPipelineRunner runner; - - /** The Cloud Dataflow Job representation. */ - private final Job job = new Job(); - - /** - * Translator is stateful, as addProperty calls refer to the current step. - */ - private Step currentStep; - - /** - * A Map from AppliedPTransform to their unique Dataflow step names. - */ - private final Map, String> stepNames = new HashMap<>(); - - /** - * A Map from PValues to their output names used by their producer - * Dataflow steps. - */ - private final Map outputNames = new HashMap<>(); - - /** - * A Map from PValues to the Coders used for them. - */ - private final Map> outputCoders = new HashMap<>(); - - /** - * The transform currently being applied. - */ - private AppliedPTransform currentTransform; - - /** - * Constructs a Translator that will translate the specified - * Pipeline into Dataflow objects. - */ - public Translator(Pipeline pipeline, DataflowPipelineRunner runner) { - this.pipeline = pipeline; - this.runner = runner; - } - - /** - * Translates this Translator's pipeline onto its writer. - * @return a Job definition filled in with the type of job, the environment, - * and the job steps. - */ - public Job translate(List packages) { - job.setName(options.getJobName().toLowerCase()); - - Environment environment = new Environment(); - job.setEnvironment(environment); - - try { - environment.setSdkPipelineOptions( - MAPPER.readValue(MAPPER.writeValueAsBytes(options), Map.class)); - } catch (IOException e) { - throw new IllegalArgumentException( - "PipelineOptions specified failed to serialize to JSON.", e); - } - - WorkerPool workerPool = new WorkerPool(); - - if (options.getTeardownPolicy() != null) { - workerPool.setTeardownPolicy(options.getTeardownPolicy().getTeardownPolicyName()); - } - - if (options.isStreaming()) { - job.setType("JOB_TYPE_STREAMING"); - } else { - job.setType("JOB_TYPE_BATCH"); - workerPool.setDiskType(options.getWorkerDiskType()); - } - - if (options.getWorkerMachineType() != null) { - workerPool.setMachineType(options.getWorkerMachineType()); - } - - workerPool.setPackages(packages); - workerPool.setNumWorkers(options.getNumWorkers()); - - if (options.isStreaming()) { - // Use separate data disk for streaming. - Disk disk = new Disk(); - disk.setDiskType(options.getWorkerDiskType()); - workerPool.setDataDisks(Collections.singletonList(disk)); - } - if (!Strings.isNullOrEmpty(options.getZone())) { - workerPool.setZone(options.getZone()); - } - if (!Strings.isNullOrEmpty(options.getNetwork())) { - workerPool.setNetwork(options.getNetwork()); - } - if (!Strings.isNullOrEmpty(options.getSubnetwork())) { - workerPool.setSubnetwork(options.getSubnetwork()); - } - if (options.getDiskSizeGb() > 0) { - workerPool.setDiskSizeGb(options.getDiskSizeGb()); - } - AutoscalingSettings settings = new AutoscalingSettings(); - if (options.getAutoscalingAlgorithm() != null) { - settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm()); - } - settings.setMaxNumWorkers(options.getMaxNumWorkers()); - workerPool.setAutoscalingSettings(settings); - - List workerPools = new LinkedList<>(); - - workerPools.add(workerPool); - environment.setWorkerPools(workerPools); - - pipeline.traverseTopologically(this); - return job; - } - - @Override - public DataflowPipelineOptions getPipelineOptions() { - return options; - } - - @Override - public InputT getInput(PTransform transform) { - return (InputT) getCurrentTransform(transform).getInput(); - } - - @Override - public OutputT getOutput(PTransform transform) { - return (OutputT) getCurrentTransform(transform).getOutput(); - } - - @Override - public String getFullName(PTransform transform) { - return getCurrentTransform(transform).getFullName(); - } - - private AppliedPTransform getCurrentTransform(PTransform transform) { - checkArgument( - currentTransform != null && currentTransform.getTransform() == transform, - "can only be called with current transform"); - return currentTransform; - } - - @Override - public void enterCompositeTransform(TransformTreeNode node) { - } - - @Override - public void leaveCompositeTransform(TransformTreeNode node) { - } - - @Override - public void visitTransform(TransformTreeNode node) { - PTransform transform = node.getTransform(); - TransformTranslator translator = - getTransformTranslator(transform.getClass()); - if (translator == null) { - throw new IllegalStateException( - "no translator registered for " + transform); - } - LOG.debug("Translating {}", transform); - currentTransform = AppliedPTransform.of( - node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform); - translator.translate(transform, this); - currentTransform = null; - } - - @Override - public void visitValue(PValue value, TransformTreeNode producer) { - LOG.debug("Checking translation of {}", value); - if (value.getProducingTransformInternal() == null) { - throw new RuntimeException( - "internal error: expecting a PValue " - + "to have a producingTransform"); - } - if (!producer.isCompositeNode()) { - // Primitive transforms are the only ones assigned step names. - asOutputReference(value); - } - } - - @Override - public void addStep(PTransform transform, String type) { - String stepName = genStepName(); - if (stepNames.put(getCurrentTransform(transform), stepName) != null) { - throw new IllegalArgumentException( - transform + " already has a name specified"); - } - // Start the next "steps" list item. - List steps = job.getSteps(); - if (steps == null) { - steps = new LinkedList<>(); - job.setSteps(steps); - } - - currentStep = new Step(); - currentStep.setName(stepName); - currentStep.setKind(type); - steps.add(currentStep); - addInput(PropertyNames.USER_NAME, getFullName(transform)); - addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform)); - } - - @Override - public void addStep(PTransform transform, Step original) { - Step step = original.clone(); - String stepName = step.getName(); - if (stepNames.put(getCurrentTransform(transform), stepName) != null) { - throw new IllegalArgumentException(transform + " already has a name specified"); - } - - Map properties = step.getProperties(); - if (properties != null) { - @Nullable List> outputInfoList = null; - try { - // TODO: This should be done via a Structs accessor. - @Nullable List> list = - (List>) properties.get(PropertyNames.OUTPUT_INFO); - outputInfoList = list; - } catch (Exception e) { - throw new RuntimeException("Inconsistent dataflow pipeline translation", e); - } - if (outputInfoList != null && outputInfoList.size() > 0) { - Map firstOutputPort = outputInfoList.get(0); - @Nullable String name; - try { - name = getString(firstOutputPort, PropertyNames.OUTPUT_NAME); - } catch (Exception e) { - name = null; - } - if (name != null) { - registerOutputName(getOutput(transform), name); - } - } - } - - List steps = job.getSteps(); - if (steps == null) { - steps = new LinkedList<>(); - job.setSteps(steps); - } - currentStep = step; - steps.add(step); - } - - @Override - public void addEncodingInput(Coder coder) { - CloudObject encoding = SerializableUtils.ensureSerializable(coder); - addObject(getProperties(), PropertyNames.ENCODING, encoding); - } - - @Override - public void addInput(String name, Boolean value) { - addBoolean(getProperties(), name, value); - } - - @Override - public void addInput(String name, String value) { - addString(getProperties(), name, value); - } - - @Override - public void addInput(String name, Long value) { - addLong(getProperties(), name, value); - } - - @Override - public void addInput(String name, Map elements) { - addDictionary(getProperties(), name, elements); - } - - @Override - public void addInput(String name, List> elements) { - addList(getProperties(), name, elements); - } - - @Override - public void addInput(String name, PInput value) { - if (value instanceof PValue) { - addInput(name, asOutputReference((PValue) value)); - } else { - throw new IllegalStateException("Input must be a PValue"); - } - } - - @Override - public void addOutput(String name, PValue value) { - Coder coder; - if (value instanceof TypedPValue) { - coder = ((TypedPValue) value).getCoder(); - if (value instanceof PCollection) { - // Wrap the PCollection element Coder inside a WindowedValueCoder. - coder = WindowedValue.getFullCoder( - coder, - ((PCollection) value).getWindowingStrategy().getWindowFn().windowCoder()); - } - } else { - // No output coder to encode. - coder = null; - } - addOutput(name, value, coder); - } - - @Override - public void addValueOnlyOutput(String name, PValue value) { - Coder coder; - if (value instanceof TypedPValue) { - coder = ((TypedPValue) value).getCoder(); - if (value instanceof PCollection) { - // Wrap the PCollection element Coder inside a ValueOnly - // WindowedValueCoder. - coder = WindowedValue.getValueOnlyCoder(coder); - } - } else { - // No output coder to encode. - coder = null; - } - addOutput(name, value, coder); - } - - @Override - public void addCollectionToSingletonOutput(String name, - PValue inputValue, - PValue outputValue) { - Coder inputValueCoder = - Preconditions.checkNotNull(outputCoders.get(inputValue)); - // The inputValueCoder for the input PCollection should be some - // WindowedValueCoder of the input PCollection's element - // coder. - Preconditions.checkState( - inputValueCoder instanceof WindowedValue.WindowedValueCoder); - // The outputValueCoder for the output should be an - // IterableCoder of the inputValueCoder. This is a property - // of the backend "CollectionToSingleton" step. - Coder outputValueCoder = IterableCoder.of(inputValueCoder); - addOutput(name, outputValue, outputValueCoder); - } - - /** - * Adds an output with the given name to the previously added - * Dataflow step, producing the specified output {@code PValue} - * with the given {@code Coder} (if not {@code null}). - */ - private void addOutput(String name, PValue value, Coder valueCoder) { - registerOutputName(value, name); - - Map properties = getProperties(); - @Nullable List> outputInfoList = null; - try { - // TODO: This should be done via a Structs accessor. - outputInfoList = (List>) properties.get(PropertyNames.OUTPUT_INFO); - } catch (Exception e) { - throw new RuntimeException("Inconsistent dataflow pipeline translation", e); - } - if (outputInfoList == null) { - outputInfoList = new ArrayList<>(); - // TODO: This should be done via a Structs accessor. - properties.put(PropertyNames.OUTPUT_INFO, outputInfoList); - } - - Map outputInfo = new HashMap<>(); - addString(outputInfo, PropertyNames.OUTPUT_NAME, name); - addString(outputInfo, PropertyNames.USER_NAME, value.getName()); - if (value instanceof PCollection - && runner.doesPCollectionRequireIndexedFormat((PCollection) value)) { - addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true); - } - if (valueCoder != null) { - // Verify that encoding can be decoded, in order to catch serialization - // failures as early as possible. - CloudObject encoding = SerializableUtils.ensureSerializable(valueCoder); - addObject(outputInfo, PropertyNames.ENCODING, encoding); - outputCoders.put(value, valueCoder); - } - - outputInfoList.add(outputInfo); - } - - private void addDisplayData(String name, DisplayData displayData) { - List> list = MAPPER.convertValue(displayData, List.class); - addList(getProperties(), name, list); - } - - @Override - public OutputReference asOutputReference(PValue value) { - AppliedPTransform transform = - value.getProducingTransformInternal(); - String stepName = stepNames.get(transform); - if (stepName == null) { - throw new IllegalArgumentException(transform + " doesn't have a name specified"); - } - - String outputName = outputNames.get(value); - if (outputName == null) { - throw new IllegalArgumentException( - "output " + value + " doesn't have a name specified"); - } - - return new OutputReference(stepName, outputName); - } - - private Map getProperties() { - Map properties = currentStep.getProperties(); - if (properties == null) { - properties = new HashMap<>(); - currentStep.setProperties(properties); - } - return properties; - } - - /** - * Returns a fresh Dataflow step name. - */ - private String genStepName() { - return "s" + (stepNames.size() + 1); - } - - /** - * Records the name of the given output PValue, - * within its producing transform. - */ - private void registerOutputName(POutput value, String name) { - if (outputNames.put(value, name) != null) { - throw new IllegalArgumentException( - "output " + value + " already has a name specified"); - } - } - } - - ///////////////////////////////////////////////////////////////////////////// - - @Override - public String toString() { - return "DataflowPipelineTranslator#" + hashCode(); - } - - - /////////////////////////////////////////////////////////////////////////// - - static { - registerTransformTranslator( - View.CreatePCollectionView.class, - new TransformTranslator() { - @Override - public void translate( - View.CreatePCollectionView transform, - TranslationContext context) { - translateTyped(transform, context); - } - - private void translateTyped( - View.CreatePCollectionView transform, - TranslationContext context) { - context.addStep(transform, "CollectionToSingleton"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addCollectionToSingletonOutput( - PropertyNames.OUTPUT, - context.getInput(transform), - context.getOutput(transform)); - } - }); - - DataflowPipelineTranslator.registerTransformTranslator( - Combine.GroupedValues.class, - new DataflowPipelineTranslator.TransformTranslator() { - @Override - public void translate( - Combine.GroupedValues transform, - DataflowPipelineTranslator.TranslationContext context) { - translateHelper(transform, context); - } - - private void translateHelper( - final Combine.GroupedValues transform, - DataflowPipelineTranslator.TranslationContext context) { - context.addStep(transform, "CombineValues"); - translateInputs(context.getInput(transform), transform.getSideInputs(), context); - - AppliedCombineFn fn = - transform.getAppliedFn( - context.getInput(transform).getPipeline().getCoderRegistry(), - context.getInput(transform).getCoder(), - context.getInput(transform).getWindowingStrategy()); - - context.addEncodingInput(fn.getAccumulatorCoder()); - context.addInput( - PropertyNames.SERIALIZED_FN, - byteArrayToJsonString(serializeToByteArray(fn))); - context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); - } - }); - - registerTransformTranslator( - Create.Values.class, - new TransformTranslator() { - @Override - public void translate( - Create.Values transform, - TranslationContext context) { - createHelper(transform, context); - } - - private void createHelper( - Create.Values transform, - TranslationContext context) { - context.addStep(transform, "CreateCollection"); - - Coder coder = context.getOutput(transform).getCoder(); - List elements = new LinkedList<>(); - for (T elem : transform.getElements()) { - byte[] encodedBytes; - try { - encodedBytes = encodeToByteArray(coder, elem); - } catch (CoderException exn) { - // TODO: Put in better element printing: - // truncate if too long. - throw new IllegalArgumentException( - "Unable to encode element '" + elem + "' of transform '" + transform - + "' using coder '" + coder + "'.", - exn); - } - String encodedJson = byteArrayToJsonString(encodedBytes); - assert Arrays.equals(encodedBytes, - jsonStringToByteArray(encodedJson)); - elements.add(CloudObject.forString(encodedJson)); - } - context.addInput(PropertyNames.ELEMENT, elements); - context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform)); - } - }); - - registerTransformTranslator( - Flatten.FlattenPCollectionList.class, - new TransformTranslator() { - @Override - public void translate( - Flatten.FlattenPCollectionList transform, - TranslationContext context) { - flattenHelper(transform, context); - } - - private void flattenHelper( - Flatten.FlattenPCollectionList transform, - TranslationContext context) { - context.addStep(transform, "Flatten"); - - List inputs = new LinkedList<>(); - for (PCollection input : context.getInput(transform).getAll()) { - inputs.add(context.asOutputReference(input)); - } - context.addInput(PropertyNames.INPUTS, inputs); - context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); - } - }); - - registerTransformTranslator( - GroupByKeyAndSortValuesOnly.class, - new TransformTranslator() { - @Override - public void translate( - GroupByKeyAndSortValuesOnly transform, - TranslationContext context) { - groupByKeyAndSortValuesHelper(transform, context); - } - - private void groupByKeyAndSortValuesHelper( - GroupByKeyAndSortValuesOnly transform, - TranslationContext context) { - context.addStep(transform, "GroupByKey"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); - context.addInput(PropertyNames.SORT_VALUES, true); - - // TODO: Add support for combiner lifting once the need arises. - context.addInput( - PropertyNames.DISALLOW_COMBINER_LIFTING, true); - } - }); - - registerTransformTranslator( - GroupByKey.class, - new TransformTranslator() { - @Override - public void translate( - GroupByKey transform, - TranslationContext context) { - groupByKeyHelper(transform, context); - } - - private void groupByKeyHelper( - GroupByKey transform, - TranslationContext context) { - context.addStep(transform, "GroupByKey"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); - - WindowingStrategy windowingStrategy = - context.getInput(transform).getWindowingStrategy(); - boolean isStreaming = - context.getPipelineOptions().as(StreamingOptions.class).isStreaming(); - boolean disallowCombinerLifting = - !windowingStrategy.getWindowFn().isNonMerging() - || (isStreaming && !transform.fewKeys()) - // TODO: Allow combiner lifting on the non-default trigger, as appropriate. - || !(windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger); - context.addInput( - PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting); - context.addInput( - PropertyNames.SERIALIZED_FN, - byteArrayToJsonString(serializeToByteArray(windowingStrategy))); - context.addInput( - PropertyNames.IS_MERGING_WINDOW_FN, - !windowingStrategy.getWindowFn().isNonMerging()); - } - }); - - registerTransformTranslator( - ParDo.BoundMulti.class, - new TransformTranslator() { - @Override - public void translate( - ParDo.BoundMulti transform, - TranslationContext context) { - translateMultiHelper(transform, context); - } - - private void translateMultiHelper( - ParDo.BoundMulti transform, - TranslationContext context) { - context.addStep(transform, "ParallelDo"); - translateInputs(context.getInput(transform), transform.getSideInputs(), context); - translateFn(transform.getFn(), context.getInput(transform).getWindowingStrategy(), - transform.getSideInputs(), context.getInput(transform).getCoder(), context); - translateOutputs(context.getOutput(transform), context); - } - }); - - registerTransformTranslator( - ParDo.Bound.class, - new TransformTranslator() { - @Override - public void translate( - ParDo.Bound transform, - TranslationContext context) { - translateSingleHelper(transform, context); - } - - private void translateSingleHelper( - ParDo.Bound transform, - TranslationContext context) { - context.addStep(transform, "ParallelDo"); - translateInputs(context.getInput(transform), transform.getSideInputs(), context); - translateFn( - transform.getFn(), - context.getInput(transform).getWindowingStrategy(), - transform.getSideInputs(), context.getInput(transform).getCoder(), context); - context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); - } - }); - - registerTransformTranslator( - Window.Bound.class, - new DataflowPipelineTranslator.TransformTranslator() { - @Override - public void translate( - Window.Bound transform, TranslationContext context) { - translateHelper(transform, context); - } - - private void translateHelper( - Window.Bound transform, TranslationContext context) { - context.addStep(transform, "Bucket"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); - - WindowingStrategy strategy = context.getOutput(transform).getWindowingStrategy(); - byte[] serializedBytes = serializeToByteArray(strategy); - String serializedJson = byteArrayToJsonString(serializedBytes); - assert Arrays.equals(serializedBytes, - jsonStringToByteArray(serializedJson)); - context.addInput(PropertyNames.SERIALIZED_FN, serializedJson); - } - }); - - /////////////////////////////////////////////////////////////////////////// - // IO Translation. - - registerTransformTranslator( - BigQueryIO.Read.Bound.class, new BigQueryIOTranslator.ReadTranslator()); - registerTransformTranslator( - BigQueryIO.Write.Bound.class, new BigQueryIOTranslator.WriteTranslator()); - - registerTransformTranslator( - PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator()); - registerTransformTranslator( - DataflowPipelineRunner.StreamingPubsubIOWrite.class, - new PubsubIOTranslator.WriteTranslator()); - - registerTransformTranslator(Read.Bounded.class, new ReadTranslator()); - } - - private static void translateInputs( - PCollection input, - List> sideInputs, - TranslationContext context) { - context.addInput(PropertyNames.PARALLEL_INPUT, input); - translateSideInputs(sideInputs, context); - } - - // Used for ParDo - private static void translateSideInputs( - List> sideInputs, - TranslationContext context) { - Map nonParInputs = new HashMap<>(); - - for (PCollectionView view : sideInputs) { - nonParInputs.put( - view.getTagInternal().getId(), - context.asOutputReference(view)); - } - - context.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs); - } - - private static void translateFn( - DoFn fn, - WindowingStrategy windowingStrategy, - Iterable> sideInputs, - Coder inputCoder, - TranslationContext context) { - context.addInput(PropertyNames.USER_FN, fn.getClass().getName()); - context.addInput( - PropertyNames.SERIALIZED_FN, - byteArrayToJsonString(serializeToByteArray( - new DoFnInfo(fn, windowingStrategy, sideInputs, inputCoder)))); - } - - private static void translateOutputs( - PCollectionTuple outputs, - TranslationContext context) { - for (Map.Entry, PCollection> entry - : outputs.getAll().entrySet()) { - TupleTag tag = entry.getKey(); - PCollection output = entry.getValue(); - context.addOutput(tag.getId(), output); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java deleted file mode 100644 index 8b9f3f4..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.runners; - -import javax.annotation.Nullable; - -/** - * Signals there was an error retrieving information about a job from the Cloud Dataflow Service. - */ -public class DataflowServiceException extends DataflowJobException { - DataflowServiceException(DataflowPipelineJob job, String message) { - this(job, message, null); - } - - DataflowServiceException(DataflowPipelineJob job, String message, @Nullable Throwable cause) { - super(job, message, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AssignWindows.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AssignWindows.java deleted file mode 100644 index 974c3a9..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AssignWindows.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.runners.dataflow; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; -import com.google.cloud.dataflow.sdk.values.PCollection; - -/** - * A primitive {@link PTransform} that implements the {@link Window#into(WindowFn)} - * {@link PTransform}. - * - * For an application of {@link Window#into(WindowFn)} that changes the {@link WindowFn}, applies - * a primitive {@link PTransform} in the Dataflow service. - * - * For an application of {@link Window#into(WindowFn)} that does not change the {@link WindowFn}, - * applies an identity {@link ParDo} and sets the windowing strategy of the output - * {@link PCollection}. - * - * For internal use only. - * - * @param the type of input element - */ -public class AssignWindows extends PTransform, PCollection> { - private final Window.Bound transform; - - /** - * Builds an instance of this class from the overriden transform. - */ - @SuppressWarnings("unused") // Used via reflection - public AssignWindows(Window.Bound transform) { - this.transform = transform; - } - - @Override - public PCollection apply(PCollection input) { - WindowingStrategy outputStrategy = - transform.getOutputStrategyInternal(input.getWindowingStrategy()); - if (transform.getWindowFn() != null) { - // If the windowFn changed, we create a primitive, and run the AssignWindows operation here. - return PCollection.createPrimitiveOutputInternal( - input.getPipeline(), outputStrategy, input.isBounded()); - } else { - // If the windowFn didn't change, we just run a pass-through transform and then set the - // new windowing strategy. - return input.apply(ParDo.named("Identity").of(new DoFn() { - @Override - public void processElement(DoFn.ProcessContext c) throws Exception { - c.output(c.element()); - } - })).setWindowingStrategyInternal(outputStrategy); - } - } - - @Override - public void validate(PCollection input) { - transform.validate(input); - } - - @Override - protected Coder getDefaultOutputCoder(PCollection input) { - return input.getCoder(); - } - - @Override - protected String getKindString() { - return "Window.Into()"; - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java deleted file mode 100755 index 203ce4f..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.runners.dataflow; - -import com.google.api.client.json.JsonFactory; -import com.google.api.services.bigquery.model.TableReference; -import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator; -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.cloud.dataflow.sdk.util.Transport; -import com.google.cloud.dataflow.sdk.util.WindowedValue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * BigQuery transform support code for the Dataflow backend. - */ -public class BigQueryIOTranslator { - private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory(); - private static final Logger LOG = LoggerFactory.getLogger(BigQueryIOTranslator.class); - - /** - * Implements BigQueryIO Read translation for the Dataflow backend. - */ - public static class ReadTranslator - implements DataflowPipelineTranslator.TransformTranslator { - - @Override - public void translate( - BigQueryIO.Read.Bound transform, DataflowPipelineTranslator.TranslationContext context) { - // Actual translation. - context.addStep(transform, "ParallelRead"); - context.addInput(PropertyNames.FORMAT, "bigquery"); - context.addInput(PropertyNames.BIGQUERY_EXPORT_FORMAT, "FORMAT_AVRO"); - - if (transform.getQuery() != null) { - context.addInput(PropertyNames.BIGQUERY_QUERY, transform.getQuery()); - context.addInput(PropertyNames.BIGQUERY_FLATTEN_RESULTS, transform.getFlattenResults()); - } else { - TableReference table = transform.getTable(); - if (table.getProjectId() == null) { - // If user does not specify a project we assume the table to be located in the project - // that owns the Dataflow job. - String projectIdFromOptions = context.getPipelineOptions().getProject(); - LOG.warn(String.format(BigQueryIO.SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(), - table.getDatasetId(), table.getTableId(), projectIdFromOptions)); - table.setProjectId(projectIdFromOptions); - } - - context.addInput(PropertyNames.BIGQUERY_TABLE, table.getTableId()); - context.addInput(PropertyNames.BIGQUERY_DATASET, table.getDatasetId()); - if (table.getProjectId() != null) { - context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId()); - } - } - context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform)); - } - } - - /** - * Implements BigQueryIO Write translation for the Dataflow backend. - */ - public static class WriteTranslator - implements DataflowPipelineTranslator.TransformTranslator { - - @Override - public void translate(BigQueryIO.Write.Bound transform, - DataflowPipelineTranslator.TranslationContext context) { - if (context.getPipelineOptions().isStreaming()) { - // Streaming is handled by the streaming runner. - throw new AssertionError( - "BigQueryIO is specified to use streaming write in batch mode."); - } - - TableReference table = transform.getTable(); - - // Actual translation. - context.addStep(transform, "ParallelWrite"); - context.addInput(PropertyNames.FORMAT, "bigquery"); - context.addInput(PropertyNames.BIGQUERY_TABLE, - table.getTableId()); - context.addInput(PropertyNames.BIGQUERY_DATASET, - table.getDatasetId()); - if (table.getProjectId() != null) { - context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId()); - } - if (transform.getSchema() != null) { - try { - context.addInput(PropertyNames.BIGQUERY_SCHEMA, - JSON_FACTORY.toString(transform.getSchema())); - } catch (IOException exn) { - throw new IllegalArgumentException("Invalid table schema.", exn); - } - } - context.addInput( - PropertyNames.BIGQUERY_CREATE_DISPOSITION, - transform.getCreateDisposition().name()); - context.addInput( - PropertyNames.BIGQUERY_WRITE_DISPOSITION, - transform.getWriteDisposition().name()); - // Set sink encoding to TableRowJsonCoder. - context.addEncodingInput( - WindowedValue.getValueOnlyCoder(TableRowJsonCoder.of())); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java deleted file mode 100755 index 3473c41..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.runners.dataflow; - -import static com.google.api.client.util.Base64.encodeBase64String; -import static com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray; -import static com.google.cloud.dataflow.sdk.util.Structs.addString; -import static com.google.cloud.dataflow.sdk.util.Structs.addStringList; -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.api.services.dataflow.model.SourceMetadata; -import com.google.cloud.dataflow.sdk.io.BoundedSource; -import com.google.cloud.dataflow.sdk.io.Source; -import com.google.cloud.dataflow.sdk.io.UnboundedSource; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.util.CloudObject; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; - - -/** - * A helper class for supporting sources defined as {@code Source}. - * - *

Provides a bridge between the high-level {@code Source} API and the - * low-level {@code CloudSource} class. - */ -public class CustomSources { - private static final String SERIALIZED_SOURCE = "serialized_source"; - @VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits"; - /** - * The current limit on the size of a ReportWorkItemStatus RPC to Google Cloud Dataflow, which - * includes the initial splits, is 20 MB. - */ - public static final long DATAFLOW_SPLIT_RESPONSE_API_SIZE_BYTES = 20 * (1 << 20); - - private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class); - - private static final ByteString firstSplitKey = ByteString.copyFromUtf8("0000000000000001"); - - public static boolean isFirstUnboundedSourceSplit(ByteString splitKey) { - return splitKey.equals(firstSplitKey); - } - - private static int getDesiredNumUnboundedSourceSplits(DataflowPipelineOptions options) { - if (options.getMaxNumWorkers() > 0) { - return options.getMaxNumWorkers(); - } else if (options.getNumWorkers() > 0) { - return options.getNumWorkers() * 3; - } else { - return 20; - } - } - - public static com.google.api.services.dataflow.model.Source serializeToCloudSource( - Source source, PipelineOptions options) throws Exception { - com.google.api.services.dataflow.model.Source cloudSource = - new com.google.api.services.dataflow.model.Source(); - // We ourselves act as the SourceFormat. - cloudSource.setSpec(CloudObject.forClass(CustomSources.class)); - addString( - cloudSource.getSpec(), SERIALIZED_SOURCE, encodeBase64String(serializeToByteArray(source))); - - SourceMetadata metadata = new SourceMetadata(); - if (source instanceof BoundedSource) { - BoundedSource boundedSource = (BoundedSource) source; - try { - metadata.setProducesSortedKeys(boundedSource.producesSortedKeys(options)); - } catch (Exception e) { - LOG.warn("Failed to check if the source produces sorted keys: " + source, e); - } - - // Size estimation is best effort so we continue even if it fails here. - try { - metadata.setEstimatedSizeBytes(boundedSource.getEstimatedSizeBytes(options)); - } catch (Exception e) { - LOG.warn("Size estimation of the source failed: " + source, e); - } - } else if (source instanceof UnboundedSource) { - UnboundedSource unboundedSource = (UnboundedSource) source; - metadata.setInfinite(true); - List encodedSplits = new ArrayList<>(); - int desiredNumSplits = - getDesiredNumUnboundedSourceSplits(options.as(DataflowPipelineOptions.class)); - for (UnboundedSource split : - unboundedSource.generateInitialSplits(desiredNumSplits, options)) { - encodedSplits.add(encodeBase64String(serializeToByteArray(split))); - } - checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split"); - addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits); - } else { - throw new IllegalArgumentException("Unexpected source kind: " + source.getClass()); - } - - cloudSource.setMetadata(metadata); - return cloudSource; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowAggregatorTransforms.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowAggregatorTransforms.java deleted file mode 100755 index 5c1dd95..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowAggregatorTransforms.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.runners.dataflow; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; - -/** - * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used. - */ -public class DataflowAggregatorTransforms { - private final Map, Collection>> aggregatorTransforms; - private final Multimap, AppliedPTransform> transformAppliedTransforms; - private final BiMap, String> appliedStepNames; - - public DataflowAggregatorTransforms( - Map, Collection>> aggregatorTransforms, - Map, String> transformStepNames) { - this.aggregatorTransforms = aggregatorTransforms; - appliedStepNames = HashBiMap.create(transformStepNames); - - transformAppliedTransforms = HashMultimap.create(); - for (AppliedPTransform appliedTransform : transformStepNames.keySet()) { - transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform); - } - } - - /** - * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}. - */ - public boolean contains(Aggregator aggregator) { - return aggregatorTransforms.containsKey(aggregator); - } - - /** - * Gets the step names in which the {@link Aggregator} is used. - */ - public Collection getAggregatorStepNames(Aggregator aggregator) { - Collection names = new HashSet<>(); - Collection> transforms = aggregatorTransforms.get(aggregator); - for (PTransform transform : transforms) { - for (AppliedPTransform applied : transformAppliedTransforms.get(transform)) { - names.add(appliedStepNames.get(applied)); - } - } - return names; - } - - /** - * Gets the {@link PTransform} that was assigned the provided step name. - */ - public AppliedPTransform getAppliedTransformForStepName(String stepName) { - return appliedStepNames.inverse().get(stepName); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowMetricUpdateExtractor.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowMetricUpdateExtractor.java deleted file mode 100755 index 8de1038..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowMetricUpdateExtractor.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.runners.dataflow; - -import com.google.api.services.dataflow.model.MetricStructuredName; -import com.google.api.services.dataflow.model.MetricUpdate; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn; -import com.google.cloud.dataflow.sdk.transforms.PTransform; - -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Methods for extracting the values of an {@link Aggregator} from a collection of {@link - * MetricUpdate MetricUpdates}. - */ -public final class DataflowMetricUpdateExtractor { - private static final String STEP_NAME_CONTEXT_KEY = "step"; - private static final String IS_TENTATIVE_KEY = "tentative"; - - private DataflowMetricUpdateExtractor() { - // Do not instantiate. - } - - /** - * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in - * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link - * MetricUpdate MetricUpdates}. - */ - public static Map fromMetricUpdates(Aggregator aggregator, - DataflowAggregatorTransforms aggregatorTransforms, List metricUpdates) { - Map results = new HashMap<>(); - if (metricUpdates == null) { - return results; - } - - String aggregatorName = aggregator.getName(); - Collection aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator); - - for (MetricUpdate metricUpdate : metricUpdates) { - MetricStructuredName metricStructuredName = metricUpdate.getName(); - Map context = metricStructuredName.getContext(); - if (metricStructuredName.getName().equals(aggregatorName) && context != null - && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) { - AppliedPTransform transform = - aggregatorTransforms.getAppliedTransformForStepName( - context.get(STEP_NAME_CONTEXT_KEY)); - String fullName = transform.getFullName(); - // Prefer the tentative (fresher) value if it exists. - if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) { - results.put(fullName, toValue(aggregator, metricUpdate)); - } - } - } - - return results; - - } - - private static OutputT toValue( - Aggregator aggregator, MetricUpdate metricUpdate) { - CombineFn combineFn = aggregator.getCombineFn(); - Class outputType = combineFn.getOutputType().getRawType(); - - if (outputType.equals(Long.class)) { - @SuppressWarnings("unchecked") - OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue()); - return asLong; - } - if (outputType.equals(Integer.class)) { - @SuppressWarnings("unchecked") - OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue()); - return asInt; - } - if (outputType.equals(Double.class)) { - @SuppressWarnings("unchecked") - OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue()); - return asDouble; - } - throw new UnsupportedOperationException( - "Unsupported Output Type " + outputType + " in aggregator " + aggregator); - } - - private static Number toNumber(MetricUpdate update) { - if (update.getScalar() instanceof Number) { - return (Number) update.getScalar(); - } - throw new IllegalArgumentException( - "Metric Update " + update + " does not have a numeric scalar"); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java deleted file mode 100755 index b54d5c6..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.runners.dataflow; - -import com.google.cloud.dataflow.sdk.io.PubsubIO; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext; -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.cloud.dataflow.sdk.util.WindowedValue; - -/** - * Pubsub transform support code for the Dataflow backend. - */ -public class PubsubIOTranslator { - - /** - * Implements PubsubIO Read translation for the Dataflow backend. - */ - public static class ReadTranslator implements TransformTranslator> { - @Override - @SuppressWarnings({"rawtypes", "unchecked"}) - public void translate( - PubsubIO.Read.Bound transform, - TranslationContext context) { - translateReadHelper(transform, context); - } - - private void translateReadHelper( - PubsubIO.Read.Bound transform, - TranslationContext context) { - if (!context.getPipelineOptions().isStreaming()) { - throw new IllegalArgumentException( - "PubsubIO.Read can only be used with the Dataflow streaming runner."); - } - - context.addStep(transform, "ParallelRead"); - context.addInput(PropertyNames.FORMAT, "pubsub"); - if (transform.getTopic() != null) { - context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path()); - } - if (transform.getSubscription() != null) { - context.addInput( - PropertyNames.PUBSUB_SUBSCRIPTION, transform.getSubscription().asV1Beta1Path()); - } - if (transform.getTimestampLabel() != null) { - context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel()); - } - if (transform.getIdLabel() != null) { - context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel()); - } - context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform)); - } - } - - /** - * Implements PubsubIO Write translation for the Dataflow backend. - */ - public static class WriteTranslator - implements TransformTranslator> { - - @Override - @SuppressWarnings({"rawtypes", "unchecked"}) - public void translate( - DataflowPipelineRunner.StreamingPubsubIOWrite transform, - TranslationContext context) { - translateWriteHelper(transform, context); - } - - private void translateWriteHelper( - DataflowPipelineRunner.StreamingPubsubIOWrite customTransform, - TranslationContext context) { - if (!context.getPipelineOptions().isStreaming()) { - throw new IllegalArgumentException( - "PubsubIO.Write is non-primitive for the Dataflow batch runner."); - } - - PubsubIO.Write.Bound transform = customTransform.getOverriddenTransform(); - - context.addStep(customTransform, "ParallelWrite"); - context.addInput(PropertyNames.FORMAT, "pubsub"); - context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path()); - if (transform.getTimestampLabel() != null) { - context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel()); - } - if (transform.getIdLabel() != null) { - context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel()); - } - context.addEncodingInput(WindowedValue.getValueOnlyCoder(transform.getCoder())); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(customTransform)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java deleted file mode 100755 index 4998af3..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.runners.dataflow; - -import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean; -import static com.google.cloud.dataflow.sdk.util.Structs.addDictionary; -import static com.google.cloud.dataflow.sdk.util.Structs.addLong; - -import com.google.api.services.dataflow.model.SourceMetadata; -import com.google.cloud.dataflow.sdk.io.FileBasedSource; -import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.io.Source; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.cloud.dataflow.sdk.values.PValue; - -import java.util.HashMap; -import java.util.Map; - -/** - * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end. - */ -public class ReadTranslator implements TransformTranslator> { - @Override - public void translate(Read.Bounded transform, TranslationContext context) { - translateReadHelper(transform.getSource(), transform, context); - } - - public static void translateReadHelper(Source source, - PTransform transform, - DataflowPipelineTranslator.TranslationContext context) { - try { - // TODO: Move this validation out of translation once IOChannelUtils is portable - // and can be reconstructed on the worker. - if (source instanceof FileBasedSource) { - String filePatternOrSpec = ((FileBasedSource) source).getFileOrPatternSpec(); - context.getPipelineOptions() - .getPathValidator() - .validateInputFilePatternSupported(filePatternOrSpec); - } - - context.addStep(transform, "ParallelRead"); - context.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT); - context.addInput( - PropertyNames.SOURCE_STEP_INPUT, - cloudSourceToDictionary( - CustomSources.serializeToCloudSource(source, context.getPipelineOptions()))); - context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT} - // property of CloudWorkflowStep.input. - private static Map cloudSourceToDictionary( - com.google.api.services.dataflow.model.Source source) { - // Do not translate encoding - the source's encoding is translated elsewhere - // to the step's output info. - Map res = new HashMap<>(); - addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec()); - if (source.getMetadata() != null) { - addDictionary(res, PropertyNames.SOURCE_METADATA, - cloudSourceMetadataToDictionary(source.getMetadata())); - } - if (source.getDoesNotNeedSplitting() != null) { - addBoolean( - res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting()); - } - return res; - } - - private static Map cloudSourceMetadataToDictionary(SourceMetadata metadata) { - Map res = new HashMap<>(); - if (metadata.getProducesSortedKeys() != null) { - addBoolean(res, PropertyNames.SOURCE_PRODUCES_SORTED_KEYS, metadata.getProducesSortedKeys()); - } - if (metadata.getEstimatedSizeBytes() != null) { - addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes()); - } - if (metadata.getInfinite() != null) { - addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite()); - } - return res; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java deleted file mode 100755 index d1484ce..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Implementation of the {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}. - */ -package com.google.cloud.dataflow.sdk.runners.dataflow; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java deleted file mode 100644 index 2f920a6..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.testing; - -import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions; - -/** - * A set of options used to configure the {@link TestPipeline}. - */ -public interface TestDataflowPipelineOptions extends BlockingDataflowPipelineOptions { - -}