hadoop-ozone-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adorosz...@apache.org
Subject [hadoop-ozone] branch master updated: HDDS-2504. Handle InterruptedException properly (#386)
Date Thu, 02 Jan 2020 10:54:57 GMT
This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 87f69cb  HDDS-2504. Handle InterruptedException properly (#386)
87f69cb is described below

commit 87f69cbc1b0a089fbe2f06b56f161766a7c2f6ab
Author: Vivek Ratnavel Subramanian <vivekratnavel90@gmail.com>
AuthorDate: Thu Jan 2 16:24:48 2020 +0530

    HDDS-2504. Handle InterruptedException properly (#386)
---
 .../hadoop/hdds/scm/storage/CommitWatcher.java     | 23 ++++++++++------
 .../apache/hadoop/hdds/scm/XceiverClientSpi.java   | 23 +++++++++++-----
 .../hadoop/hdds/utils/BackgroundService.java       | 10 ++++++-
 .../org/apache/hadoop/hdds/utils/Scheduler.java    | 31 ++++++++++++----------
 4 files changed, 58 insertions(+), 29 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
index 766065f..ebcc6dc 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
@@ -164,7 +164,6 @@ public class CommitWatcher {
     }
   }
 
-
   private void adjustBuffers(long commitIndex) {
     List<Long> keyList = commitIndex2flushedDataMap.keySet().stream()
         .filter(p -> p <= commitIndex).collect(Collectors.toList());
@@ -180,7 +179,6 @@ public class CommitWatcher {
     adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
   }
 
-
   /**
    * calls watchForCommit API of the Ratis Client. For Standalone client,
    * it is a no op.
@@ -201,15 +199,24 @@ public class CommitWatcher {
       }
       adjustBuffers(index);
       return reply;
-    } catch (TimeoutException | InterruptedException | ExecutionException e) {
-      LOG.warn("watchForCommit failed for index " + commitIndex, e);
-      IOException ioException = new IOException(
-          "Unexpected Storage Container Exception: " + e.toString(), e);
-      releaseBuffersOnException();
-      throw ioException;
+    } catch (InterruptedException e) {
+      // Re-interrupt the thread while catching InterruptedException
+      Thread.currentThread().interrupt();
+      throw getIOExceptionForWatchForCommit(commitIndex, e);
+    } catch (TimeoutException | ExecutionException e) {
+      throw getIOExceptionForWatchForCommit(commitIndex, e);
     }
   }
 
+  private IOException getIOExceptionForWatchForCommit(long commitIndex,
+                                                       Exception e) {
+    LOG.warn("watchForCommit failed for index {}", commitIndex, e);
+    IOException ioException = new IOException(
+        "Unexpected Storage Container Exception: " + e.toString(), e);
+    releaseBuffersOnException();
+    return ioException;
+  }
+
   @VisibleForTesting
   public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
     return commitIndex2flushedDataMap;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index d9e5a1f..f938448 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -107,10 +107,13 @@ public abstract class XceiverClientSpi implements Closeable {
     try {
       XceiverClientReply reply;
       reply = sendCommandAsync(request);
-      ContainerCommandResponseProto responseProto = reply.getResponse().get();
-      return responseProto;
-    } catch (ExecutionException | InterruptedException e) {
-      throw new IOException("Failed to command " + request, e);
+      return reply.getResponse().get();
+    } catch (InterruptedException e) {
+      // Re-interrupt the thread while catching InterruptedException
+      Thread.currentThread().interrupt();
+      throw getIOExceptionForSendCommand(request, e);
+    } catch (ExecutionException e) {
+      throw getIOExceptionForSendCommand(request, e);
     }
   }
 
@@ -133,11 +136,19 @@ public abstract class XceiverClientSpi implements Closeable {
         function.apply(request, responseProto);
       }
       return responseProto;
-    } catch (ExecutionException | InterruptedException e) {
-      throw new IOException("Failed to command " + request, e);
+    } catch (InterruptedException e) {
+      // Re-interrupt the thread while catching InterruptedException
+      Thread.currentThread().interrupt();
+      throw getIOExceptionForSendCommand(request, e);
+    } catch (ExecutionException e) {
+      throw getIOExceptionForSendCommand(request, e);
     }
   }
 
+  private IOException getIOExceptionForSendCommand(
+      ContainerCommandRequestProto request, Exception e) {
+    return new IOException("Failed to execute command " + request, e);
+  }
   /**
    * Sends a given command to server gets a waitable future back.
    *
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
index ca8d870..727e903 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
@@ -134,7 +134,13 @@ public abstract class BackgroundService {
           if (LOG.isDebugEnabled()) {
             LOG.debug("task execution result size {}", result.getSize());
           }
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (InterruptedException e) {
+          LOG.warn(
+              "Background task failed due to interruption, retrying in " +
+                  "next interval", e);
+          // Re-interrupt the thread while catching InterruptedException
+          Thread.currentThread().interrupt();
+        } catch (ExecutionException e) {
           LOG.warn(
               "Background task fails to execute, "
                   + "retrying in next interval", e);
@@ -155,6 +161,8 @@ public abstract class BackgroundService {
         exec.shutdownNow();
       }
     } catch (InterruptedException e) {
+      // Re-interrupt the thread while catching InterruptedException
+      Thread.currentThread().interrupt();
       exec.shutdownNow();
     }
     if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
index f5e55c1..8a1c5fb 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
@@ -34,7 +34,7 @@ public class Scheduler {
   private static final Logger LOG =
       LoggerFactory.getLogger(Scheduler.class);
 
-  private ScheduledExecutorService scheduler;
+  private ScheduledExecutorService scheduledExecutorService;
 
   private volatile boolean isClosed;
 
@@ -48,23 +48,24 @@ public class Scheduler {
    * @param numCoreThreads - number of core threads to maintain in the scheduler
    */
   public Scheduler(String threadName, boolean isDaemon, int numCoreThreads) {
-    scheduler = Executors.newScheduledThreadPool(numCoreThreads, r -> {
-      Thread t = new Thread(r);
-      t.setName(threadName);
-      t.setDaemon(isDaemon);
-      return t;
-    });
+    scheduledExecutorService = Executors.newScheduledThreadPool(numCoreThreads,
+        r -> {
+          Thread t = new Thread(r);
+          t.setName(threadName);
+          t.setDaemon(isDaemon);
+          return t;
+        });
     this.threadName = threadName;
     isClosed = false;
   }
 
   public void schedule(Runnable runnable, long delay, TimeUnit timeUnit) {
-    scheduler.schedule(runnable, delay, timeUnit);
+    scheduledExecutorService.schedule(runnable, delay, timeUnit);
   }
 
   public void schedule(CheckedRunnable runnable, long delay,
       TimeUnit timeUnit, Logger logger, String errMsg) {
-    scheduler.schedule(() -> {
+    scheduledExecutorService.schedule(() -> {
       try {
         runnable.run();
       } catch (Throwable throwable) {
@@ -75,7 +76,7 @@ public class Scheduler {
 
   public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable,
       long initialDelay, long fixedDelay, TimeUnit timeUnit) {
-    return scheduler
+    return scheduledExecutorService
         .scheduleWithFixedDelay(runnable, initialDelay, fixedDelay, timeUnit);
   }
 
@@ -90,16 +91,18 @@ public class Scheduler {
    */
   public synchronized void close() {
     isClosed = true;
-    if (scheduler != null) {
-      scheduler.shutdownNow();
+    if (scheduledExecutorService != null) {
+      scheduledExecutorService.shutdownNow();
       try {
-        scheduler.awaitTermination(60, TimeUnit.SECONDS);
+        scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         LOG.info(
             threadName + " interrupted while waiting for task completion {}",
             e);
+        // Re-interrupt the thread while catching InterruptedException
+        Thread.currentThread().interrupt();
       }
     }
-    scheduler = null;
+    scheduledExecutorService = null;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org


Mime
View raw message