Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1D12B200C4D for ; Wed, 22 Mar 2017 00:50:50 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1BB6E160B90; Tue, 21 Mar 2017 23:50:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3D8A8160B81 for ; Wed, 22 Mar 2017 00:50:49 +0100 (CET) Received: (qmail 3960 invoked by uid 500); 21 Mar 2017 23:50:48 -0000 Mailing-List: contact notifications-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list notifications@asterixdb.apache.org Received: (qmail 3939 invoked by uid 99); 21 Mar 2017 23:50:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Mar 2017 23:50:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id BBD211809E2 for ; Tue, 21 Mar 2017 23:50:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.92 X-Spam-Level: X-Spam-Status: No, score=0.92 tagged_above=-999 required=6.31 tests=[SPF_FAIL=0.919, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id ih9bCl1wGodW for ; Tue, 21 Mar 2017 23:50:46 +0000 (UTC) Received: from unhygienix.ics.uci.edu (unhygienix.ics.uci.edu [128.195.14.130]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 025AD5FB99 for ; Tue, 21 Mar 2017 23:50:45 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by unhygienix.ics.uci.edu (Postfix) with ESMTP id 73EA8242046; Tue, 21 Mar 2017 16:50:45 -0700 (PDT) Date: Tue, 21 Mar 2017 16:50:45 -0700 From: "abdullah alamoudi (Code Review)" Message-ID: Reply-To: bamousaa@gmail.com X-Gerrit-MessageType: newchange Subject: Change in asterixdb[master]: Prevent hangs on active runtime stop X-Gerrit-Change-Id: I2e60f633cac8e835dcc7211e87d104ecbb8947b0 X-Gerrit-ChangeURL: X-Gerrit-Commit: 436a0d37af2dd74aa5a462c19f0e4e0eed4d3bac MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Content-Disposition: inline User-Agent: Gerrit/2.12.7 To: undisclosed-recipients:; archived-at: Tue, 21 Mar 2017 23:50:50 -0000 abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1608 Change subject: Prevent hangs on active runtime stop ...................................................................... Prevent hangs on active runtime stop Change-Id: I2e60f633cac8e835dcc7211e87d104ecbb8947b0 --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java 3 files changed, 39 insertions(+), 31 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/08/1608/1 diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java index 2adce1c..12fc4e9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java @@ -49,25 +49,27 @@ } boolean continueIngestion = true; boolean failedIngestion = false; - while (continueIngestion) { - try { - // Start the adapter - adapter.start(partition, writer); - // Adapter has completed execution - continueIngestion = false; - } catch (Exception e) { - LOGGER.error("Exception during feed ingestion ", e); - continueIngestion = adapter.handleException(e); - failedIngestion = !continueIngestion; + try { + while (continueIngestion) { + try { + // Start the adapter + adapter.start(partition, writer); + // Adapter has completed execution + continueIngestion = false; + } catch (Exception e) { + LOGGER.error("Exception during feed ingestion ", e); + continueIngestion = adapter.handleException(e); + failedIngestion = !continueIngestion; + } + } + } finally { + // Done with the adapter. about to close, setting the stage based on the failed ingestion flag and notifying the + // runtime manager + adapterManager.setFailed(failedIngestion); + adapterManager.setDone(true); + synchronized (adapterManager) { + adapterManager.notifyAll(); } } - // Done with the adapter. about to close, setting the stage based on the failed ingestion flag and notifying the - // runtime manager - adapterManager.setFailed(failedIngestion); - adapterManager.setDone(true); - synchronized (adapterManager) { - adapterManager.notifyAll(); - } } - } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java index 7f5372b..d950dc5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java @@ -19,11 +19,14 @@ package org.apache.asterix.external.feed.runtime; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.asterix.active.EntityId; import org.apache.asterix.external.dataset.adapter.FeedAdapter; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.log4j.Level; import org.apache.log4j.Logger; /** @@ -51,7 +54,7 @@ private volatile boolean failed = false; public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, FeedAdapter feedAdapter, - IFrameWriter writer, int partition) { + IFrameWriter writer, int partition) { this.ctx = ctx; this.feedId = entityId; this.feedAdapter = feedAdapter; @@ -63,20 +66,22 @@ execution = ctx.getExecutorService().submit(adapterExecutor); } - public void stop() throws InterruptedException { + public void stop() throws HyracksDataException, InterruptedException { try { - if (feedAdapter.stop()) { - // stop() returned true, we wait for the process termination - execution.get(); - } else { - // stop() returned false, we try to force shutdown - execution.cancel(true); - } + ctx.getExecutorService().submit(() -> { + if (!feedAdapter.stop()) { + execution.get(); + } + return null; + }).get(30, TimeUnit.SECONDS); } catch (InterruptedException e) { - LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e); + LOGGER.log(Level.WARN, "Interrupted while trying to stop an adapter runtime", e); throw e; - } catch (Exception exception) { - LOGGER.error("Unable to stop adapter " + feedAdapter, exception); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Exception while trying to stop an adapter runtime", e); + throw HyracksDataException.create(e); + } finally { + execution.cancel(true); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java index 3100704..590af01 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java @@ -24,6 +24,7 @@ import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveRuntime; +import org.apache.hyracks.api.exceptions.HyracksDataException; public class IngestionRuntime implements IActiveRuntime { @@ -50,7 +51,7 @@ } @Override - public void stop() throws InterruptedException { + public void stop() throws InterruptedException, HyracksDataException { adapterRuntimeManager.stop(); LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " stopped on partition " + runtimeId); } -- To view, visit https://asterix-gerrit.ics.uci.edu/1608 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I2e60f633cac8e835dcc7211e87d104ecbb8947b0 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi