Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 98D9119F41 for ; Mon, 11 Apr 2016 23:42:44 +0000 (UTC) Received: (qmail 39947 invoked by uid 500); 11 Apr 2016 23:42:44 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 39904 invoked by uid 500); 11 Apr 2016 23:42:44 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 39836 invoked by uid 99); 11 Apr 2016 23:42:44 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Apr 2016 23:42:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 0B74EC0D9B for ; Mon, 11 Apr 2016 23:42:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id SWfd1frJWqCi for ; Mon, 11 Apr 2016 23:42:38 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 7C7AD5F232 for ; Mon, 11 Apr 2016 23:42:36 +0000 (UTC) Received: (qmail 39296 invoked by uid 99); 11 Apr 2016 23:42:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Apr 2016 23:42:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 813F7DFC6E; Mon, 11 Apr 2016 23:42:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lcwik@apache.org To: commits@beam.incubator.apache.org Date: Mon, 11 Apr 2016 23:42:50 -0000 Message-Id: <504373b75e374345a680c66f70d0d82c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/18] incubator-beam git commit: [BEAM-151] Move a large portion of the Dataflow runner to separate maven module http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java new file mode 100644 index 0000000..4a4f100 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +/** + * An exception that is thrown if the unique job name constraint of the Dataflow + * service is broken because an existing job with the same job name is currently active. + * The {@link DataflowPipelineJob} contained within this exception contains information + * about the pre-existing job. + */ +public class DataflowJobAlreadyExistsException extends DataflowJobException { + /** + * Create a new {@code DataflowJobAlreadyExistsException} with the specified {@link + * DataflowPipelineJob} and message. + */ + public DataflowJobAlreadyExistsException( + DataflowPipelineJob job, String message) { + super(job, message, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java new file mode 100644 index 0000000..1f52c6a --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +/** + * An exception that is thrown if the existing job has already been updated within the Dataflow + * service and is no longer able to be updated. The {@link DataflowPipelineJob} contained within + * this exception contains information about the pre-existing updated job. + */ +public class DataflowJobAlreadyUpdatedException extends DataflowJobException { + /** + * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link + * DataflowPipelineJob} and message. + */ + public DataflowJobAlreadyUpdatedException( + DataflowPipelineJob job, String message) { + super(job, message, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java new file mode 100644 index 0000000..495ca5a --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +/** + * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution. + */ +public class DataflowJobCancelledException extends DataflowJobException { + /** + * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link + * DataflowPipelineJob} and message. + */ + public DataflowJobCancelledException(DataflowPipelineJob job, String message) { + super(job, message, null); + } + + /** + * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link + * DataflowPipelineJob}, message, and cause. + */ + public DataflowJobCancelledException(DataflowPipelineJob job, String message, Throwable cause) { + super(job, message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java new file mode 100644 index 0000000..a22d13c --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import java.util.Objects; + +import javax.annotation.Nullable; + +/** + * A {@link RuntimeException} that contains information about a {@link DataflowPipelineJob}. + */ +public abstract class DataflowJobException extends RuntimeException { + private final DataflowPipelineJob job; + + DataflowJobException(DataflowPipelineJob job, String message, @Nullable Throwable cause) { + super(message, cause); + this.job = Objects.requireNonNull(job); + } + + /** + * Returns the failed job. + */ + public DataflowPipelineJob getJob() { + return job; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java new file mode 100644 index 0000000..3dbb007 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import javax.annotation.Nullable; + +/** + * Signals that a job run by a {@link BlockingDataflowPipelineRunner} fails during execution, and + * provides access to the failed job. + */ +public class DataflowJobExecutionException extends DataflowJobException { + DataflowJobExecutionException(DataflowPipelineJob job, String message) { + this(job, message, null); + } + + DataflowJobExecutionException( + DataflowPipelineJob job, String message, @Nullable Throwable cause) { + super(job, message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java new file mode 100644 index 0000000..72deb45 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +/** + * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution. + */ +public class DataflowJobUpdatedException extends DataflowJobException { + private DataflowPipelineJob replacedByJob; + + /** + * Create a new {@code DataflowJobUpdatedException} with the specified original {@link + * DataflowPipelineJob}, message, and replacement {@link DataflowPipelineJob}. + */ + public DataflowJobUpdatedException( + DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob) { + this(job, message, replacedByJob, null); + } + + /** + * Create a new {@code DataflowJobUpdatedException} with the specified original {@link + * DataflowPipelineJob}, message, replacement {@link DataflowPipelineJob}, and cause. + */ + public DataflowJobUpdatedException( + DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob, Throwable cause) { + super(job, message, cause); + this.replacedByJob = replacedByJob; + } + + /** + * The new job that replaces the job terminated with this exception. + */ + public DataflowPipelineJob getReplacedByJob() { + return replacedByJob; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java new file mode 100644 index 0000000..7b97c7d --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; + +/** + * A {@link DataflowPipeline} is a {@link Pipeline} that returns a + * {@link DataflowPipelineJob} when it is + * {@link com.google.cloud.dataflow.sdk.Pipeline#run()}. + * + *

This is not intended for use by users of Cloud Dataflow. + * Instead, use {@link Pipeline#create(PipelineOptions)} to initialize a + * {@link Pipeline}. + */ +public class DataflowPipeline extends Pipeline { + + /** + * Creates and returns a new {@link DataflowPipeline} instance for tests. + */ + public static DataflowPipeline create(DataflowPipelineOptions options) { + return new DataflowPipeline(options); + } + + private DataflowPipeline(DataflowPipelineOptions options) { + super(DataflowPipelineRunner.fromOptions(options), options); + } + + @Override + public DataflowPipelineJob run() { + return (DataflowPipelineJob) super.run(); + } + + @Override + public DataflowPipelineRunner getRunner() { + return (DataflowPipelineRunner) super.getRunner(); + } + + @Override + public String toString() { + return "DataflowPipeline#" + getOptions().as(DataflowPipelineOptions.class).getJobName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java new file mode 100644 index 0000000..632be6d --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.NanoClock; +import com.google.api.client.util.Sleeper; +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.Job; +import com.google.api.services.dataflow.model.JobMessage; +import com.google.api.services.dataflow.model.JobMetrics; +import com.google.api.services.dataflow.model.MetricUpdate; +import com.google.cloud.dataflow.sdk.PipelineResult; +import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms; +import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowMetricUpdateExtractor; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; +import com.google.cloud.dataflow.sdk.util.AttemptAndTimeBoundedExponentialBackOff; +import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff; +import com.google.cloud.dataflow.sdk.util.MapAggregatorValues; +import com.google.cloud.dataflow.sdk.util.MonitoringUtil; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +/** + * A DataflowPipelineJob represents a job submitted to Dataflow using + * {@link DataflowPipelineRunner}. + */ +public class DataflowPipelineJob implements PipelineResult { + private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class); + + /** + * The id for the job. + */ + private String jobId; + + /** + * Google cloud project to associate this pipeline with. + */ + private String projectId; + + /** + * Client for the Dataflow service. This can be used to query the service + * for information about the job. + */ + private Dataflow dataflowClient; + + /** + * The state the job terminated in or {@code null} if the job has not terminated. + */ + @Nullable + private State terminalState = null; + + /** + * The job that replaced this one or {@code null} if the job has not been replaced. + */ + @Nullable + private DataflowPipelineJob replacedByJob = null; + + private DataflowAggregatorTransforms aggregatorTransforms; + + /** + * The Metric Updates retrieved after the job was in a terminal state. + */ + private List terminalMetricUpdates; + + /** + * The polling interval for job status and messages information. + */ + static final long MESSAGES_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2); + static final long STATUS_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2); + + /** + * The amount of polling attempts for job status and messages information. + */ + static final int MESSAGES_POLLING_ATTEMPTS = 10; + static final int STATUS_POLLING_ATTEMPTS = 5; + + /** + * Constructs the job. + * + * @param projectId the project id + * @param jobId the job id + * @param dataflowClient the client for the Dataflow Service + */ + public DataflowPipelineJob(String projectId, String jobId, Dataflow dataflowClient, + DataflowAggregatorTransforms aggregatorTransforms) { + this.projectId = projectId; + this.jobId = jobId; + this.dataflowClient = dataflowClient; + this.aggregatorTransforms = aggregatorTransforms; + } + + /** + * Get the id of this job. + */ + public String getJobId() { + return jobId; + } + + /** + * Get the project this job exists in. + */ + public String getProjectId() { + return projectId; + } + + /** + * Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable. + * + * @throws IllegalStateException if called before the job has terminated or if the job terminated + * but was not updated + */ + public DataflowPipelineJob getReplacedByJob() { + if (terminalState == null) { + throw new IllegalStateException("getReplacedByJob() called before job terminated"); + } + if (replacedByJob == null) { + throw new IllegalStateException("getReplacedByJob() called for job that was not replaced"); + } + return replacedByJob; + } + + /** + * Get the Cloud Dataflow API Client used by this job. + */ + public Dataflow getDataflowClient() { + return dataflowClient; + } + + /** + * Waits for the job to finish and return the final status. + * + * @param timeToWait The time to wait in units timeUnit for the job to finish. + * Provide a value less than 1 ms for an infinite wait. + * @param timeUnit The unit of time for timeToWait. + * @param messageHandler If non null this handler will be invoked for each + * batch of messages received. + * @return The final state of the job or null on timeout or if the + * thread is interrupted. + * @throws IOException If there is a persistent problem getting job + * information. + * @throws InterruptedException + */ + @Nullable + public State waitToFinish( + long timeToWait, + TimeUnit timeUnit, + MonitoringUtil.JobMessagesHandler messageHandler) + throws IOException, InterruptedException { + return waitToFinish(timeToWait, timeUnit, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM); + } + + /** + * Wait for the job to finish and return the final status. + * + * @param timeToWait The time to wait in units timeUnit for the job to finish. + * Provide a value less than 1 ms for an infinite wait. + * @param timeUnit The unit of time for timeToWait. + * @param messageHandler If non null this handler will be invoked for each + * batch of messages received. + * @param sleeper A sleeper to use to sleep between attempts. + * @param nanoClock A nanoClock used to time the total time taken. + * @return The final state of the job or null on timeout or if the + * thread is interrupted. + * @throws IOException If there is a persistent problem getting job + * information. + * @throws InterruptedException + */ + @Nullable + @VisibleForTesting + State waitToFinish( + long timeToWait, + TimeUnit timeUnit, + MonitoringUtil.JobMessagesHandler messageHandler, + Sleeper sleeper, + NanoClock nanoClock) + throws IOException, InterruptedException { + MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowClient); + + long lastTimestamp = 0; + BackOff backoff = + timeUnit.toMillis(timeToWait) > 0 + ? new AttemptAndTimeBoundedExponentialBackOff( + MESSAGES_POLLING_ATTEMPTS, + MESSAGES_POLLING_INTERVAL, + timeUnit.toMillis(timeToWait), + AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS, + nanoClock) + : new AttemptBoundedExponentialBackOff( + MESSAGES_POLLING_ATTEMPTS, MESSAGES_POLLING_INTERVAL); + State state; + do { + // Get the state of the job before listing messages. This ensures we always fetch job + // messages after the job finishes to ensure we have all them. + state = getStateWithRetries(1, sleeper); + boolean hasError = state == State.UNKNOWN; + + if (messageHandler != null && !hasError) { + // Process all the job messages that have accumulated so far. + try { + List allMessages = monitor.getJobMessages( + jobId, lastTimestamp); + + if (!allMessages.isEmpty()) { + lastTimestamp = + fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis(); + messageHandler.process(allMessages); + } + } catch (GoogleJsonResponseException | SocketTimeoutException e) { + hasError = true; + LOG.warn("There were problems getting current job messages: {}.", e.getMessage()); + LOG.debug("Exception information:", e); + } + } + + if (!hasError) { + backoff.reset(); + // Check if the job is done. + if (state.isTerminal()) { + return state; + } + } + } while(BackOffUtils.next(sleeper, backoff)); + LOG.warn("No terminal state was returned. State value {}", state); + return null; // Timed out. + } + + /** + * Cancels the job. + * @throws IOException if there is a problem executing the cancel request. + */ + public void cancel() throws IOException { + Job content = new Job(); + content.setProjectId(projectId); + content.setId(jobId); + content.setRequestedState("JOB_STATE_CANCELLED"); + dataflowClient.projects().jobs() + .update(projectId, jobId, content) + .execute(); + } + + @Override + public State getState() { + if (terminalState != null) { + return terminalState; + } + + return getStateWithRetries(STATUS_POLLING_ATTEMPTS, Sleeper.DEFAULT); + } + + /** + * Attempts to get the state. Uses exponential backoff on failure up to the maximum number + * of passed in attempts. + * + * @param attempts The amount of attempts to make. + * @param sleeper Object used to do the sleeps between attempts. + * @return The state of the job or State.UNKNOWN in case of failure. + */ + @VisibleForTesting + State getStateWithRetries(int attempts, Sleeper sleeper) { + if (terminalState != null) { + return terminalState; + } + try { + Job job = getJobWithRetries(attempts, sleeper); + return MonitoringUtil.toState(job.getCurrentState()); + } catch (IOException exn) { + // The only IOException that getJobWithRetries is permitted to throw is the final IOException + // that caused the failure of retry. Other exceptions are wrapped in an unchecked exceptions + // and will propagate. + return State.UNKNOWN; + } + } + + /** + * Attempts to get the underlying {@link Job}. Uses exponential backoff on failure up to the + * maximum number of passed in attempts. + * + * @param attempts The amount of attempts to make. + * @param sleeper Object used to do the sleeps between attempts. + * @return The underlying {@link Job} object. + * @throws IOException When the maximum number of retries is exhausted, the last exception is + * thrown. + */ + @VisibleForTesting + Job getJobWithRetries(int attempts, Sleeper sleeper) throws IOException { + AttemptBoundedExponentialBackOff backoff = + new AttemptBoundedExponentialBackOff(attempts, STATUS_POLLING_INTERVAL); + + // Retry loop ends in return or throw + while (true) { + try { + Job job = dataflowClient + .projects() + .jobs() + .get(projectId, jobId) + .execute(); + State currentState = MonitoringUtil.toState(job.getCurrentState()); + if (currentState.isTerminal()) { + terminalState = currentState; + replacedByJob = new DataflowPipelineJob( + getProjectId(), job.getReplacedByJobId(), dataflowClient, aggregatorTransforms); + } + return job; + } catch (IOException exn) { + LOG.warn("There were problems getting current job status: {}.", exn.getMessage()); + LOG.debug("Exception information:", exn); + + if (!nextBackOff(sleeper, backoff)) { + throw exn; + } + } + } + } + + /** + * Identical to {@link BackOffUtils#next} but without checked exceptions. + */ + private boolean nextBackOff(Sleeper sleeper, BackOff backoff) { + try { + return BackOffUtils.next(sleeper, backoff); + } catch (InterruptedException | IOException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw Throwables.propagate(e); + } + } + + @Override + public AggregatorValues getAggregatorValues(Aggregator aggregator) + throws AggregatorRetrievalException { + try { + return new MapAggregatorValues<>(fromMetricUpdates(aggregator)); + } catch (IOException e) { + throw new AggregatorRetrievalException( + "IOException when retrieving Aggregator values for Aggregator " + aggregator, e); + } + } + + private Map fromMetricUpdates(Aggregator aggregator) + throws IOException { + if (aggregatorTransforms.contains(aggregator)) { + List metricUpdates; + if (terminalMetricUpdates != null) { + metricUpdates = terminalMetricUpdates; + } else { + boolean terminal = getState().isTerminal(); + JobMetrics jobMetrics = + dataflowClient.projects().jobs().getMetrics(projectId, jobId).execute(); + metricUpdates = jobMetrics.getMetrics(); + if (terminal && jobMetrics.getMetrics() != null) { + terminalMetricUpdates = metricUpdates; + } + } + + return DataflowMetricUpdateExtractor.fromMetricUpdates( + aggregator, aggregatorTransforms, metricUpdates); + } else { + throw new IllegalArgumentException( + "Aggregator " + aggregator + " is not used in this pipeline"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java new file mode 100644 index 0000000..8aaa7cc --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import com.google.auto.service.AutoService; +import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar; +import com.google.common.collect.ImmutableList; + +/** + * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for + * the {@link DataflowPipeline}. + */ +public class DataflowPipelineRegistrar { + private DataflowPipelineRegistrar() { } + + /** + * Register the {@link DataflowPipelineOptions} and {@link BlockingDataflowPipelineOptions}. + */ + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { + @Override + public Iterable> getPipelineOptions() { + return ImmutableList.>of( + DataflowPipelineOptions.class, + BlockingDataflowPipelineOptions.class); + } + } + + /** + * Register the {@link DataflowPipelineRunner} and {@link BlockingDataflowPipelineRunner}. + */ + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { + @Override + public Iterable>> getPipelineRunners() { + return ImmutableList.>>of( + DataflowPipelineRunner.class, + BlockingDataflowPipelineRunner.class); + } + } +}