Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D512D19E1C for ; Wed, 30 Mar 2016 23:28:53 +0000 (UTC) Received: (qmail 15325 invoked by uid 500); 30 Mar 2016 23:28:53 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 15275 invoked by uid 500); 30 Mar 2016 23:28:53 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 15266 invoked by uid 99); 30 Mar 2016 23:28:53 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Mar 2016 23:28:53 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 48EFB1804DC for ; Wed, 30 Mar 2016 23:28:53 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id X9EsOiIUnShS for ; Wed, 30 Mar 2016 23:28:51 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 66B155F39D for ; Wed, 30 Mar 2016 23:28:50 +0000 (UTC) Received: (qmail 15175 invoked by uid 99); 30 Mar 2016 23:28:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Mar 2016 23:28:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9787DDFB8A; Wed, 30 Mar 2016 23:28:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lcwik@apache.org To: commits@beam.incubator.apache.org Date: Wed, 30 Mar 2016 23:28:49 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-beam git commit: Re-interrupt current thread when ignoring InterruptedException Repository: incubator-beam Updated Branches: refs/heads/master 78abd964a -> c6aac3b26 Re-interrupt current thread when ignoring InterruptedException Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a8eeaf5e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a8eeaf5e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a8eeaf5e Branch: refs/heads/master Commit: a8eeaf5e1f0c74beb96ad78e73af4751cd650262 Parents: 78abd96 Author: Pei He Authored: Mon Mar 14 17:34:57 2016 -0700 Committer: Luke Cwik Committed: Wed Mar 30 16:27:23 2016 -0700 ---------------------------------------------------------------------- .../dataflow/examples/common/DataflowExampleUtils.java | 8 +++----- .../com/google/cloud/dataflow/sdk/io/BigQueryIO.java | 3 +++ .../dataflow/sdk/io/BoundedReadFromUnboundedSource.java | 6 +++--- .../sdk/runners/BlockingDataflowPipelineRunner.java | 3 +++ .../cloud/dataflow/sdk/runners/DataflowPipelineJob.java | 3 +++ .../sdk/transforms/IntraBundleParallelization.java | 1 + .../cloud/dataflow/sdk/util/BigQueryTableInserter.java | 2 ++ .../dataflow/sdk/util/BigQueryTableRowIterator.java | 12 +++++++----- .../com/google/cloud/dataflow/sdk/util/GcsUtil.java | 4 ++++ 9 files changed, 29 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java index 4dfdd85..23562d0 100644 --- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java +++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java @@ -50,6 +50,7 @@ import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collection; @@ -108,6 +109,7 @@ public class DataflowExampleUtils { } } while (BackOffUtils.next(sleeper, backOff)); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Ignore InterruptedException } Throwables.propagate(lastException); @@ -442,11 +444,7 @@ public class DataflowExampleUtils { System.out.println( "The example pipeline is still running. Verifying the cancellation."); } - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - // Ignore - } + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); } if (!cancellationVerified) { System.out.println("Failed to verify the cancellation for job: " + job.getJobId()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index ab7df6f..8b08225 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -1438,6 +1438,9 @@ public class BigQueryIO { LOG.info("Number of records read from BigQuery: {}", elems.size()); context.setPCollection(context.getOutput(transform), elems); } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java index 52c730c..3fa9c69 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java @@ -29,6 +29,7 @@ import com.google.cloud.dataflow.sdk.util.IntervalBoundedExponentialBackOff; import com.google.cloud.dataflow.sdk.util.ValueWithRecordId; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.common.util.concurrent.Uninterruptibles; import org.joda.time.Duration; import org.joda.time.Instant; @@ -37,6 +38,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; /** @@ -234,9 +236,7 @@ class BoundedReadFromUnboundedSource extends PTransform