asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: Prevent hangs on active runtime stop
Date Tue, 21 Mar 2017 23:50:45 GMT
abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1608

Change subject: Prevent hangs on active runtime stop
......................................................................

Prevent hangs on active runtime stop

Change-Id: I2e60f633cac8e835dcc7211e87d104ecbb8947b0
---
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
3 files changed, 39 insertions(+), 31 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/08/1608/1

diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
index 2adce1c..12fc4e9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
@@ -49,25 +49,27 @@
         }
         boolean continueIngestion = true;
         boolean failedIngestion = false;
-        while (continueIngestion) {
-            try {
-                // Start the adapter
-                adapter.start(partition, writer);
-                // Adapter has completed execution
-                continueIngestion = false;
-            } catch (Exception e) {
-                LOGGER.error("Exception during feed ingestion ", e);
-                continueIngestion = adapter.handleException(e);
-                failedIngestion = !continueIngestion;
+        try {
+            while (continueIngestion) {
+                try {
+                    // Start the adapter
+                    adapter.start(partition, writer);
+                    // Adapter has completed execution
+                    continueIngestion = false;
+                } catch (Exception e) {
+                    LOGGER.error("Exception during feed ingestion ", e);
+                    continueIngestion = adapter.handleException(e);
+                    failedIngestion = !continueIngestion;
+                }
+            }
+        } finally {
+            // Done with the adapter. about to close, setting the stage based on the failed
ingestion flag and notifying the
+            // runtime manager
+            adapterManager.setFailed(failedIngestion);
+            adapterManager.setDone(true);
+            synchronized (adapterManager) {
+                adapterManager.notifyAll();
             }
         }
-        // Done with the adapter. about to close, setting the stage based on the failed ingestion
flag and notifying the
-        // runtime manager
-        adapterManager.setFailed(failedIngestion);
-        adapterManager.setDone(true);
-        synchronized (adapterManager) {
-            adapterManager.notifyAll();
-        }
     }
-
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index 7f5372b..d950dc5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -19,11 +19,14 @@
 package org.apache.asterix.external.feed.runtime;
 
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 /**
@@ -51,7 +54,7 @@
     private volatile boolean failed = false;
 
     public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, FeedAdapter
feedAdapter,
-                                 IFrameWriter writer, int partition) {
+            IFrameWriter writer, int partition) {
         this.ctx = ctx;
         this.feedId = entityId;
         this.feedAdapter = feedAdapter;
@@ -63,20 +66,22 @@
         execution = ctx.getExecutorService().submit(adapterExecutor);
     }
 
-    public void stop() throws InterruptedException {
+    public void stop() throws HyracksDataException, InterruptedException {
         try {
-            if (feedAdapter.stop()) {
-                // stop() returned true, we wait for the process termination
-                execution.get();
-            } else {
-                // stop() returned false, we try to force shutdown
-                execution.cancel(true);
-            }
+            ctx.getExecutorService().submit(() -> {
+                if (!feedAdapter.stop()) {
+                    execution.get();
+                }
+                return null;
+            }).get(30, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
-            LOGGER.error("Interrupted while waiting for feed adapter to finish its work",
e);
+            LOGGER.log(Level.WARN, "Interrupted while trying to stop an adapter runtime",
e);
             throw e;
-        } catch (Exception exception) {
-            LOGGER.error("Unable to stop adapter " + feedAdapter, exception);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "Exception while trying to stop an adapter runtime", e);
+            throw HyracksDataException.create(e);
+        } finally {
+            execution.cancel(true);
         }
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 3100704..590af01 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveRuntime;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class IngestionRuntime implements IActiveRuntime {
 
@@ -50,7 +51,7 @@
     }
 
     @Override
-    public void stop() throws InterruptedException {
+    public void stop() throws InterruptedException, HyracksDataException {
         adapterRuntimeManager.stop();
         LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " stopped on partition
" + runtimeId);
     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1608
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2e60f633cac8e835dcc7211e87d104ecbb8947b0
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message