beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] incubator-beam git commit: Re-interrupt current thread when ignoring InterruptedException
Date Wed, 30 Mar 2016 23:28:49 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 78abd964a -> c6aac3b26


Re-interrupt current thread when ignoring InterruptedException


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a8eeaf5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a8eeaf5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a8eeaf5e

Branch: refs/heads/master
Commit: a8eeaf5e1f0c74beb96ad78e73af4751cd650262
Parents: 78abd96
Author: Pei He <peihe0@gmail.com>
Authored: Mon Mar 14 17:34:57 2016 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Wed Mar 30 16:27:23 2016 -0700

----------------------------------------------------------------------
 .../dataflow/examples/common/DataflowExampleUtils.java  |  8 +++-----
 .../com/google/cloud/dataflow/sdk/io/BigQueryIO.java    |  3 +++
 .../dataflow/sdk/io/BoundedReadFromUnboundedSource.java |  6 +++---
 .../sdk/runners/BlockingDataflowPipelineRunner.java     |  3 +++
 .../cloud/dataflow/sdk/runners/DataflowPipelineJob.java |  3 +++
 .../sdk/transforms/IntraBundleParallelization.java      |  1 +
 .../cloud/dataflow/sdk/util/BigQueryTableInserter.java  |  2 ++
 .../dataflow/sdk/util/BigQueryTableRowIterator.java     | 12 +++++++-----
 .../com/google/cloud/dataflow/sdk/util/GcsUtil.java     |  4 ++++
 9 files changed, 29 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java
b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java
index 4dfdd85..23562d0 100644
--- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java
@@ -50,6 +50,7 @@ import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -108,6 +109,7 @@ public class DataflowExampleUtils {
         }
       } while (BackOffUtils.next(sleeper, backOff));
     } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
       // Ignore InterruptedException
     }
     Throwables.propagate(lastException);
@@ -442,11 +444,7 @@ public class DataflowExampleUtils {
               System.out.println(
                   "The example pipeline is still running. Verifying the cancellation.");
             }
-            try {
-              Thread.sleep(10000);
-            } catch (InterruptedException e) {
-              // Ignore
-            }
+            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
           }
           if (!cancellationVerified) {
             System.out.println("Failed to verify the cancellation for job: " + job.getJobId());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java
index ab7df6f..8b08225 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java
@@ -1438,6 +1438,9 @@ public class BigQueryIO {
       LOG.info("Number of records read from BigQuery: {}", elems.size());
       context.setPCollection(context.getOutput(transform), elems);
     } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
       throw new RuntimeException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java
index 52c730c..3fa9c69 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java
@@ -29,6 +29,7 @@ import com.google.cloud.dataflow.sdk.util.IntervalBoundedExponentialBackOff;
 import com.google.cloud.dataflow.sdk.util.ValueWithRecordId;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -37,6 +38,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
 
 
 /**
@@ -234,9 +236,7 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PInput,
PCollection<T
           if (reader.advance()) {
             return true;
           }
-          try {
-            Thread.sleep(nextSleep);
-          } catch (InterruptedException e) {}
+          Uninterruptibles.sleepUninterruptibly(nextSleep, TimeUnit.MILLISECONDS);
           nextSleep = backoff.nextBackOffMillis();
         }
         finalizeCheckpoint();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
index 95e3dfe..7b45e5b 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
@@ -118,6 +118,9 @@ public class BlockingDataflowPipelineRunner extends
             BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
             new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
       } catch (IOException | InterruptedException ex) {
+        if (ex instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
         LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(),
ex);
         throw new DataflowServiceException(
             job, "Exception caught while retrieving status for job " + job.getJobId(), ex);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
index e9f134c..c5173a9 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
@@ -348,6 +348,9 @@ public class DataflowPipelineJob implements PipelineResult {
     try {
       return BackOffUtils.next(sleeper, backoff);
     } catch (InterruptedException | IOException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
       throw Throwables.propagate(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java
index b6497b7..39d2dc8 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java
@@ -204,6 +204,7 @@ public class IntraBundleParallelization {
       try {
         workTickets.acquire();
       } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
         throw new RuntimeException("Interrupted while scheduling work", e);
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
index cd51062..4a9ea6b 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
@@ -248,6 +248,7 @@ public class BigQueryTableInserter {
           }
         }
       } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
         throw new IOException("Interrupted while inserting " + rowsToPublish);
       } catch (ExecutionException e) {
         Throwables.propagate(e.getCause());
@@ -257,6 +258,7 @@ public class BigQueryTableInserter {
         try {
           Thread.sleep(backoff.nextBackOffMillis());
         } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
           throw new IOException("Interrupted while waiting before retrying insert of " +
retryRows);
         }
         LOG.info("Retrying failed inserts to BigQuery");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
index c2c80f7..75b3bb9 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
@@ -45,6 +45,7 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -59,6 +60,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
@@ -412,11 +414,8 @@ public class BigQueryTableRowIterator implements AutoCloseable {
           throw new IOException("Executing query " + query + " failed: " + error.getMessage());
         }
       }
-      try {
-        Thread.sleep(QUERY_COMPLETION_POLL_TIME.getMillis());
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
+      Uninterruptibles.sleepUninterruptibly(
+          QUERY_COMPLETION_POLL_TIME.getMillis(), TimeUnit.MILLISECONDS);
     }
   }
 
@@ -462,6 +461,9 @@ public class BigQueryTableRowIterator implements AutoCloseable {
         deleteDataset(temporaryDatasetId);
       }
     } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
       throw new RuntimeException(e);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsUtil.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsUtil.java
index 8fd258f..47adb59 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsUtil.java
@@ -159,6 +159,9 @@ public class GcsUtil {
             IOException.class);
         return ImmutableList.of(gcsPattern);
       } catch (IOException | InterruptedException e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
         if (e instanceof IOException && errorExtractor.itemNotFound((IOException)
e)) {
           // If the path was not found, return an empty list.
           return ImmutableList.of();
@@ -343,6 +346,7 @@ public class GcsUtil {
         }
         throw e;
       } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
         throw new IOException(
             String.format("Error while attempting to verify existence of bucket gs://%s",
                 path.getBucket()), e);


Mime
View raw message