Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7CF36200CD2 for ; Thu, 27 Jul 2017 08:36:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7B91A16A4B9; Thu, 27 Jul 2017 06:36:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1B0B916A4B2 for ; Thu, 27 Jul 2017 08:35:58 +0200 (CEST) Received: (qmail 49478 invoked by uid 500); 27 Jul 2017 06:35:57 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 49338 invoked by uid 99); 27 Jul 2017 06:35:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Jul 2017 06:35:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86E15F32C5; Thu, 27 Jul 2017 06:35:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.apache.org Date: Thu, 27 Jul 2017 06:36:03 -0000 Message-Id: <37ecb2f23a6f451a94fdf93497917bd0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [9/9] asterixdb git commit: [ASTERIXDB-1992][ING] Suspend/Resume active entities archived-at: Thu, 27 Jul 2017 06:36:01 -0000 [ASTERIXDB-1992][ING] Suspend/Resume active entities - user model changes: wait for completion only returns when the entity becomes inactive. - storage format changes: no - interface changes: 01) Introduce: IMetadataLockManager for entity locking. IActiveNotificationHandler for handling active events. IRetryPolicy for recovery of failed active jobs. IActiveEntityController for controlling active entities. 02) IJobLifecycleListener.notifyJobFinish now passes the JobStatus and Exceptions. 03) IActiveEntityEventsListener.isActive() returns true, if entity is active, false otherwise. 04) IActiveEntityEventsListener.unregister() removes the listener upon entity deletion. 05) IActiveEntityEventsListener.getJobFailure() returns the job failure if the entity is in a failed state. 06) IStatementExecutor.getComponentProvider() returns the storage component provider. 07) IStatementExecutor.getApplicationContext() returns the application context. 08) IMetadataManager.upsertEntity to perfrom metadata entities' upsert operations. 09) IMetadataNode.upsertEntity to perfrom metadata entities' upsert operations 10) ICcApplicationContext.getMetadataLockManager() returns the lock manger. 11) ICcApplicationContext.getClusterStateManager() returns the cluster state manager. details: - Starting and stopping of active entities now go through their listeners rather than having the listener indicating active entity and not having it indicating inactive entity. This facilitates suspend/resume operations of long running active jobs for the sake of DDL operations or topology changes. - Unit tests for the vast majority of code paths and different possible scenarios have been added. Change-Id: Ifeac8c73e6bad39a13663b84a52121356e3c6b40 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1875 Tested-by: Jenkins BAD: Jenkins Integration-Tests: Jenkins Reviewed-by: Murtadha Hubail Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/a85f4121 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/a85f4121 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/a85f4121 Branch: refs/heads/master Commit: a85f4121fd5ed6ec7f985d84da1181ec74201d9e Parents: d57d81f Author: Abdullah Alamoudi Authored: Wed Jul 26 21:35:47 2017 -0700 Committer: abdullah alamoudi Committed: Wed Jul 26 23:35:10 2017 -0700 ---------------------------------------------------------------------- .../org/apache/asterix/active/ActiveEvent.java | 12 +- .../active/ActiveJobNotificationHandler.java | 169 --- .../asterix/active/ActiveLifecycleListener.java | 86 -- .../asterix/active/ActiveRuntimeManager.java | 15 +- .../apache/asterix/active/ActivityState.java | 39 +- .../apache/asterix/active/CountRetryPolicy.java | 39 + .../asterix/active/CountRetryPolicyFactory.java | 34 + .../active/IActiveEntityEventSubscriber.java | 57 + .../active/IActiveEntityEventsListener.java | 19 +- .../asterix/active/IActiveEventSubscriber.java | 62 - .../active/IActiveNotificationHandler.java | 93 ++ .../org/apache/asterix/active/IRetryPolicy.java | 27 + .../asterix/active/IRetryPolicyFactory.java | 27 + .../asterix/active/InfiniteRetryPolicy.java | 42 + .../active/InfiniteRetryPolicyFactory.java | 28 + .../asterix/active/NoRetryPolicyFactory.java | 32 + .../active/SingleThreadEventProcessor.java | 78 + .../active/message/ActivePartitionMessage.java | 8 +- .../physical/InvertedIndexPOperator.java | 2 +- .../translator/AbstractLangTranslator.java | 20 +- .../LangExpressionToPlanTranslator.java | 2 +- .../api/http/server/ActiveStatsApiServlet.java | 14 +- .../api/http/server/ConnectorApiServlet.java | 3 +- .../api/http/server/RebalanceApiServlet.java | 33 +- .../app/active/ActiveEntityEventsListener.java | 659 ++++++++ .../app/active/ActiveNotificationHandler.java | 282 ++++ .../asterix/app/active/FeedEventsListener.java | 149 ++ .../DefaultStatementExecutorFactory.java | 3 +- .../asterix/app/translator/QueryTranslator.java | 359 ++--- .../hyracks/bootstrap/CCApplication.java | 26 +- .../bootstrap/GlobalRecoveryManager.java | 136 +- .../org/apache/asterix/utils/RebalanceUtil.java | 72 +- .../http/servlet/ConnectorApiServletTest.java | 6 +- .../app/bootstrap/TestNodeController.java | 5 +- .../org/apache/asterix/test/active/Action.java | 59 + .../test/active/ActiveEventsListenerTest.java | 1432 ++++++++++++++++++ .../asterix/test/active/ActiveStatsTest.java | 61 +- .../org/apache/asterix/test/active/Actor.java | 37 + .../test/active/DummyFeedEventsListener.java | 74 + .../test/active/TestClusterControllerActor.java | 85 ++ .../asterix/test/active/TestEventsListener.java | 202 +++ .../test/active/TestNodeControllerActor.java | 67 + .../asterix/test/active/TestUserActor.java | 287 ++++ .../asterix/test/common/TestExecutor.java | 9 +- .../src/test/resources/runtimets/testsuite.xml | 6 +- .../common/api/IClusterEventsSubscriber.java | 2 +- .../common/api/IMetadataLockManager.java | 311 ++++ .../common/cluster/IGlobalRecoveryManager.java | 15 +- .../common/dataflow/ICcApplicationContext.java | 16 +- .../common/exceptions/AsterixException.java | 21 +- .../common/exceptions/CompilationException.java | 3 +- .../asterix/common/exceptions/ErrorCode.java | 27 + .../common/exceptions/MetadataException.java | 77 + .../common/exceptions/RuntimeDataException.java | 3 +- .../asterix/common/metadata/IDataset.java | 11 +- .../asterix/common/metadata/IMetadataLock.java | 93 ++ .../asterix/common/metadata/LockList.java | 128 ++ .../main/resources/asx_errormsg/en.properties | 33 +- .../external/api/IDataSourceAdapter.java | 2 + .../AbstractFeedDataFlowController.java | 4 + .../dataflow/FeedRecordDataFlowController.java | 49 +- .../external/dataflow/FeedTupleForwarder.java | 11 +- .../external/dataset/adapter/FeedAdapter.java | 4 + .../management/ActiveEntityEventsListener.java | 248 --- .../external/feed/runtime/AdapterExecutor.java | 4 + .../external/feed/watch/AbstractSubscriber.java | 37 +- .../external/feed/watch/NoOpSubscriber.java | 9 +- .../external/feed/watch/StatsSubscriber.java | 12 +- .../feed/watch/WaitForStateSubscriber.java | 32 +- .../aql/statement/SubscribeFeedStatement.java | 2 +- .../common/statement/ConnectFeedStatement.java | 5 +- .../apache/asterix/metadata/MetadataCache.java | 15 +- .../asterix/metadata/MetadataException.java | 44 - .../asterix/metadata/MetadataManager.java | 131 +- .../apache/asterix/metadata/MetadataNode.java | 214 ++- .../metadata/api/IActiveEntityController.java | 107 ++ .../api/IMetadataEntityTupleTranslator.java | 2 +- .../asterix/metadata/api/IMetadataManager.java | 12 +- .../asterix/metadata/api/IMetadataNode.java | 14 +- .../asterix/metadata/api/IValueExtractor.java | 2 +- .../metadata/bootstrap/MetadataBootstrap.java | 17 +- .../metadata/declared/MetadataManagerUtil.java | 2 +- .../metadata/declared/MetadataProvider.java | 71 +- .../metadata/entities/BuiltinTypeMap.java | 2 +- .../asterix/metadata/entities/Dataset.java | 42 +- .../metadata/entities/FeedConnection.java | 2 - .../apache/asterix/metadata/entities/Index.java | 5 + .../CompactionPolicyTupleTranslator.java | 2 +- .../DatasetTupleTranslator.java | 2 +- .../DatasourceAdapterTupleTranslator.java | 2 +- .../DatatypeTupleTranslator.java | 2 +- .../DataverseTupleTranslator.java | 2 +- .../ExternalFileTupleTranslator.java | 2 +- .../FeedConnectionTupleTranslator.java | 2 +- .../FeedPolicyTupleTranslator.java | 2 +- .../FeedTupleTranslator.java | 2 +- .../FunctionTupleTranslator.java | 2 +- .../IndexTupleTranslator.java | 2 +- .../LibraryTupleTranslator.java | 2 +- .../NodeGroupTupleTranslator.java | 2 +- .../NodeTupleTranslator.java | 2 +- .../metadata/feeds/BuiltinFeedPolicies.java | 61 +- .../metadata/feeds/FeedMetadataUtil.java | 7 +- .../functions/ExternalFunctionCompilerUtil.java | 2 +- .../asterix/metadata/lock/DatasetLock.java | 152 +- .../asterix/metadata/lock/IMetadataLock.java | 57 - .../apache/asterix/metadata/lock/LockList.java | 79 - .../asterix/metadata/lock/MetadataLock.java | 11 +- .../metadata/lock/MetadataLockManager.java | 213 +-- .../asterix/metadata/utils/DatasetUtil.java | 48 +- .../metadata/utils/MetadataLockUtil.java | 168 ++ .../asterix/metadata/utils/MetadataUtil.java | 5 + .../utils/SplitsAndConstraintsUtil.java | 18 +- .../MetadataEntityValueExtractor.java | 2 +- .../NestedDatatypeNameValueExtractor.java | 2 +- .../TupleCopyValueExtractor.java | 2 +- .../DatasetTupleTranslatorTest.java | 2 +- .../IndexTupleTranslatorTest.java | 2 +- .../runtime/utils/CcApplicationContext.java | 18 +- .../runtime/utils/ClusterStateManager.java | 27 +- .../asterix/tools/datagen/AdmDataGen.java | 2 +- .../tools/translator/ADGenDmlTranslator.java | 2 +- .../opcallbacks/UpsertOperationCallback.java | 2 +- .../api/dataset/IDatasetPartitionManager.java | 18 +- .../api/exceptions/HyracksDataException.java | 11 +- .../hyracks/api/job/IJobLifecycleListener.java | 32 +- .../control/cc/ClusterControllerService.java | 21 +- .../cc/application/CCServiceContext.java | 9 +- .../hyracks/control/cc/cluster/NodeManager.java | 37 +- .../cc/dataset/DatasetDirectoryService.java | 7 +- .../hyracks/control/cc/job/JobManager.java | 15 +- .../control/cc/work/RegisterNodeWork.java | 4 + .../control/cc/cluster/NodeManagerTest.java | 6 +- .../control/common/utils/ExceptionUtils.java | 9 +- .../control/nc/NodeControllerService.java | 54 +- .../nc/dataset/DatasetPartitionManager.java | 22 +- .../nc/partitions/PipelinedPartition.java | 19 +- .../control/nc/work/AbortAllTasksWork.java | 59 + .../hyracks/control/nc/work/AbortTasksWork.java | 4 +- .../dataflow/common/utils/FrameDebugUtils.java | 56 +- .../NonDeterministicChannelReader.java | 7 +- 141 files changed, 6211 insertions(+), 1887 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java index 1141912..a810576 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java @@ -30,7 +30,9 @@ public class ActiveEvent { JOB_FINISHED, PARTITION_EVENT, EXTENSION_EVENT, - STATS_UPDATED + STATS_UPDATED, + STATE_CHANGED, + FAILURE } private final JobId jobId; @@ -45,10 +47,6 @@ public class ActiveEvent { this.eventObject = eventObject; } - public ActiveEvent(JobId jobId, Kind eventKind, EntityId entityId) { - this(jobId, eventKind, entityId, null); - } - public JobId getJobId() { return jobId; } @@ -79,8 +77,8 @@ public class ActiveEvent { return true; } ActiveEvent other = (ActiveEvent) o; - return Objects.equals(entityId, other.entityId) && Objects.equals(eventKind, other.eventKind) && Objects - .equals(eventObject, other.eventObject); + return Objects.equals(entityId, other.entityId) && Objects.equals(eventKind, other.eventKind) + && Objects.equals(eventObject, other.eventObject); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java deleted file mode 100644 index d2b8a89..0000000 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.active; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.active.ActiveEvent.Kind; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; - -public class ActiveJobNotificationHandler implements Runnable { - public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob"; - private static final Logger LOGGER = Logger.getLogger(ActiveJobNotificationHandler.class.getName()); - private static final boolean DEBUG = false; - private final LinkedBlockingQueue eventInbox; - private final Map entityEventListeners; - private final Map jobId2ActiveJobInfos; - - public ActiveJobNotificationHandler() { - this.eventInbox = new LinkedBlockingQueue<>(); - this.jobId2ActiveJobInfos = new HashMap<>(); - this.entityEventListeners = new HashMap<>(); - } - - @Override - public void run() { - Thread.currentThread().setName(ActiveJobNotificationHandler.class.getSimpleName()); - LOGGER.log(Level.INFO, "Started " + ActiveJobNotificationHandler.class.getSimpleName()); - while (!Thread.interrupted()) { - try { - ActiveEvent event = getEventInbox().take(); - EntityId entityId = jobId2ActiveJobInfos.get(event.getJobId()); - if (entityId != null) { - IActiveEntityEventsListener listener = entityEventListeners.get(entityId); - LOGGER.log(Level.FINER, "Next event is of type " + event.getEventKind()); - if (event.getEventKind() == Kind.JOB_FINISHED) { - LOGGER.log(Level.FINER, "Removing the job"); - jobId2ActiveJobInfos.remove(event.getJobId()); - } - if (listener != null) { - LOGGER.log(Level.FINER, "Notifying the listener"); - listener.notify(event); - } - - } else { - LOGGER.log(Level.SEVERE, "Entity not found for received message for job " + event.getJobId()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Error handling an active job event", e); - } - } - LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName()); - } - - public synchronized void removeListener(IActiveEntityEventsListener listener) throws HyracksDataException { - LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore"); - unregisterListener(listener); - } - - public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) { - if (DEBUG) { - LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId); - IActiveEntityEventsListener listener = entityEventListeners.get(entityId); - LOGGER.log(Level.WARNING, "Listener found: " + listener); - } - return entityEventListeners.get(entityId); - } - - public EntityId getEntity(JobId jobId) { - return jobId2ActiveJobInfos.get(jobId); - } - - public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) { - LOGGER.log(Level.FINER, - "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = " + jobId); - Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME); - if (property == null || !(property instanceof EntityId)) { - LOGGER.log(Level.FINER, "Job was is not active. property found to be: " + property); - return; - } - EntityId entityId = (EntityId) property; - monitorJob(jobId, entityId); - boolean found = jobId2ActiveJobInfos.get(jobId) != null; - LOGGER.log(Level.FINER, "Job was found to be: " + (found ? "Active" : "Inactive")); - IActiveEntityEventsListener listener = entityEventListeners.get(entityId); - if (listener != null) { - // It is okay to bypass the event inbox in this case because we know this is the first event for this entity - listener.notify(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification)); - } - LOGGER.log(Level.FINER, "Listener was notified" + jobId); - } - - public LinkedBlockingQueue getEventInbox() { - return eventInbox; - } - - public synchronized IActiveEntityEventsListener[] getEventListeners() { - if (DEBUG) { - LOGGER.log(Level.WARNING, "getEventListeners() was called"); - LOGGER.log(Level.WARNING, "returning " + entityEventListeners.size() + " Listeners"); - } - return entityEventListeners.values().toArray(new IActiveEntityEventsListener[entityEventListeners.size()]); - } - - public synchronized void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException { - if (DEBUG) { - LOGGER.log(Level.FINER, "registerListener(IActiveEntityEventsListener listener) was called for the entity " - + listener.getEntityId()); - } - if (entityEventListeners.containsKey(listener.getEntityId())) { - throw new HyracksDataException( - "Active Entity Listener " + listener.getEntityId() + " is already registered"); - } - entityEventListeners.put(listener.getEntityId(), listener); - } - - public synchronized void unregisterListener(IActiveEntityEventsListener listener) throws HyracksDataException { - LOGGER.log(Level.FINER, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity " - + listener.getEntityId()); - IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId()); - if (registeredListener == null) { - throw new HyracksDataException( - "Active Entity Listener " + listener.getEntityId() + " hasn't been registered"); - } - } - - public synchronized void monitorJob(JobId jobId, EntityId activeJob) { - if (DEBUG) { - LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId); - boolean found = jobId2ActiveJobInfos.get(jobId) != null; - LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive")); - } - if (entityEventListeners.containsKey(activeJob)) { - if (jobId2ActiveJobInfos.containsKey(jobId)) { - LOGGER.severe("Job is already being monitored for job: " + jobId); - return; - } - if (DEBUG) { - LOGGER.log(Level.WARNING, "monitoring started for job id: " + jobId); - } - } else { - LOGGER.severe("No listener was found for the entity: " + activeJob); - } - jobId2ActiveJobInfos.put(jobId, activeJob); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java deleted file mode 100644 index 86c3e7d..0000000 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.active; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.active.ActiveEvent.Kind; -import org.apache.asterix.active.message.ActivePartitionMessage; -import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.job.IJobLifecycleListener; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; - -public class ActiveLifecycleListener implements IJobLifecycleListener { - - private static final Logger LOGGER = Logger.getLogger(ActiveLifecycleListener.class.getName()); - - private final ActiveJobNotificationHandler notificationHandler; - private final LinkedBlockingQueue jobEventInbox; - private final ExecutorService executorService; - - public ActiveLifecycleListener() { - notificationHandler = new ActiveJobNotificationHandler(); - jobEventInbox = notificationHandler.getEventInbox(); - executorService = Executors.newSingleThreadExecutor(); - executorService.execute(notificationHandler); - } - - @Override - public synchronized void notifyJobStart(JobId jobId) throws HyracksException { - EntityId entityId = notificationHandler.getEntity(jobId); - if (entityId != null) { - jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId)); - } - } - - @Override - public synchronized void notifyJobFinish(JobId jobId) throws HyracksException { - EntityId entityId = notificationHandler.getEntity(jobId); - if (entityId != null) { - jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId)); - } else { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("NO NEED TO NOTIFY JOB FINISH!"); - } - } - } - - @Override - public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { - notificationHandler.notifyJobCreation(jobId, spec); - } - - public void receive(ActivePartitionMessage message) { - jobEventInbox.add(new ActiveEvent(message.getJobId(), Kind.PARTITION_EVENT, - message.getActiveRuntimeId().getEntityId(), message)); - } - - public void stop() { - executorService.shutdown(); - } - - public ActiveJobNotificationHandler getNotificationHandler() { - return notificationHandler; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java index e71367a..18368ae 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java @@ -28,6 +28,10 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.hyracks.api.exceptions.HyracksDataException; + public class ActiveRuntimeManager { private static final Logger LOGGER = Logger.getLogger(ActiveRuntimeManager.class.getName()); @@ -65,11 +69,18 @@ public class ActiveRuntimeManager { return activeRuntimes.get(runtimeId); } - public void registerRuntime(ActiveRuntimeId runtimeId, ActiveSourceOperatorNodePushable feedRuntime) { + public void registerRuntime(ActiveRuntimeId runtimeId, ActiveSourceOperatorNodePushable feedRuntime) + throws HyracksDataException { + if (activeRuntimes.containsKey(runtimeId)) { + throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_ALREADY_REGISTERED, runtimeId); + } activeRuntimes.put(runtimeId, feedRuntime); } - public synchronized void deregisterRuntime(ActiveRuntimeId runtimeId) { + public void deregisterRuntime(ActiveRuntimeId runtimeId) throws HyracksDataException { + if (!activeRuntimes.containsKey(runtimeId)) { + throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_NOT_REGISTERED, runtimeId); + } activeRuntimes.remove(runtimeId); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java index af8f5ca..eb43d10 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java @@ -20,27 +20,44 @@ package org.apache.asterix.active; public enum ActivityState { /** - * The initial state of an activity. + * The starting state and a possible terminal state. */ - CREATED, + STOPPED, /** - * The starting state and a possible terminal state. Next state can only be {@code ActivityState.STARTING} + * Failure to recover from a temporary faliure caused the activity to fail permanantly. + * No further recovery attempts will be made. */ - STOPPED, + PERMANENTLY_FAILED, /** - * A terminal state + * An unexpected failure caused the activity to fail but recovery attempts will start taking place */ - FAILED, + TEMPORARILY_FAILED, /** - * An intermediate state. Next state can only be {@code ActivityState.STARTED} or {@code ActivityState.FAILED} + * Attempting to recover from temporary failure. + */ + RECOVERING, + /** + * During an attempt to start the activity */ STARTING, /** - * An intermediate state. Next state can only be {@code ActivityState.STOPPING} or {@code ActivityState.FAILED} + * The activity has been started successfully and is running + */ + RUNNING, + /** + * During an attempt to gracefully stop the activity + */ + STOPPING, + /** + * During an attempt to gracefully suspend the activity + */ + SUSPENDING, + /** + * The activitiy has been suspended successfully. Next state must be resuming */ - STARTED, + SUSPENDED, /** - * An intermediate state. Next state can only be {@code ActivityState.STOPPED} or {@code ActivityState.FAILED} + * During an attempt to restart the activity after suspension */ - STOPPING + RESUMING } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java new file mode 100644 index 0000000..6633810 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +public class CountRetryPolicy implements IRetryPolicy { + + private final int count; + private int attempted = 0; + + public CountRetryPolicy(int count) { + this.count = count; + } + + @Override + public boolean retry() { + if (attempted < count) { + attempted++; + return true; + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicyFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicyFactory.java new file mode 100644 index 0000000..5e26ae4 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicyFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +public class CountRetryPolicyFactory implements IRetryPolicyFactory { + + private final int count; + + public CountRetryPolicyFactory(int count) { + this.count = count; + } + + @Override + public IRetryPolicy create(IActiveEntityEventsListener listener) { + return new CountRetryPolicy(count); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java new file mode 100644 index 0000000..f9357b4 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * An active event subscriber that subscribes to events related to active entity + */ +public interface IActiveEntityEventSubscriber { + + /** + * Notifies the subscriber of a new event + * + * @param event + * @throws HyracksDataException + */ + void notify(ActiveEvent event) throws HyracksDataException; + + /** + * Checkcs whether the subscriber is done receiving events + * + * @return + */ + boolean isDone(); + + /** + * Wait until the terminal event has been received + * + * @throws Exception + */ + void sync() throws HyracksDataException, InterruptedException; + + /** + * callback upon successful subscription + * + * @param eventsListener + * @throws HyracksDataException + */ + void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java index 4bc02f3..03b0cfc 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java @@ -58,7 +58,7 @@ public interface IActiveEntityEventsListener { * @param subscriber * @throws HyracksDataException */ - void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException; + void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException; /** * The most recent acquired stats for the active entity @@ -79,4 +79,21 @@ public interface IActiveEntityEventsListener { * @throws HyracksDataException */ void refreshStats(long timeout) throws HyracksDataException; + + /** + * @return true, if entity is active, false otherwise + */ + boolean isActive(); + + /** + * unregister the listener upon deletion of entity + * + * @throws HyracksDataException + */ + void unregister() throws HyracksDataException; + + /** + * Get the job failure for the last failed run + */ + Exception getJobFailure(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java deleted file mode 100644 index 69f7f1c..0000000 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.active; - -import org.apache.hyracks.api.exceptions.HyracksDataException; - -/** - * An active event subscriber that subscribe to events related to active entity - */ -public interface IActiveEventSubscriber { - - /** - * Notify the subscriber of a new event - * - * @param event - * @throws HyracksDataException - */ - void notify(ActiveEvent event) throws HyracksDataException; - - /** - * Checkcs whether the subscriber is done receiving events - * - * @return - */ - boolean isDone(); - - /** - * Wait until the terminal event has been received - * - * @throws InterruptedException - */ - void sync() throws InterruptedException; - - /** - * Stop watching events - */ - void unsubscribe(); - - /** - * callback upon successful subscription - * - * @param eventsListener - * @throws HyracksDataException - */ - void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException; -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveNotificationHandler.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveNotificationHandler.java new file mode 100644 index 0000000..8b9f232 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveNotificationHandler.java @@ -0,0 +1,93 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IJobLifecycleListener; + +/** + * Represents the notification handler for events of active entity jobs + */ +public interface IActiveNotificationHandler { + + /** + * Recover all active jobs that failed + * + * @throws HyracksDataException + */ + void recover() throws HyracksDataException; + + /** + * Set whether handler initialization has completed or not + * + * @param initialized + * @throws HyracksDataException + */ + void setInitialized(boolean initialized) throws HyracksDataException; + + /** + * @return true if initialization has completed, false otherwise + */ + boolean isInitialized(); + + /** + * Register a listener for events of an active entity + * + * @param listener + * the listener to register + * @throws HyracksDataException + * if the active entity already has a listener associated with it + */ + void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException; + + /** + * Unregister a listener for events of an active entity + * + * @param listener + * the listener to unregister + * @throws HyracksDataException + * if the entity is still active or if the listener was not registered + */ + void unregisterListener(IActiveEntityEventsListener listener) throws HyracksDataException; + + /** + * @return all the registered event listeners + */ + IActiveEntityEventsListener[] getEventListeners(); + + /** + * Lookup an event listener using the entity id + * + * @param entityId + * the lookup key + * @return the registered listener if found, null otherwise + */ + IActiveEntityEventsListener getListener(EntityId entityId); + + /** + * Recieves an active job message from an nc + * + * @param message + * the message + */ + void receive(ActivePartitionMessage message); + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java new file mode 100644 index 0000000..a010984 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +@FunctionalInterface +public interface IRetryPolicy { + /** + * @return true if one more attempt should be done + */ + boolean retry(); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java new file mode 100644 index 0000000..a946337 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +@FunctionalInterface +public interface IRetryPolicyFactory { + /** + * @return an instance of retry policy + */ + IRetryPolicy create(IActiveEntityEventsListener listener); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java new file mode 100644 index 0000000..074f8f4 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +public class InfiniteRetryPolicy implements IRetryPolicy { + + private final IActiveEntityEventsListener listener; + + public InfiniteRetryPolicy(IActiveEntityEventsListener listener) { + this.listener = listener; + } + + @Override + public boolean retry() { + synchronized (listener) { + try { + listener.wait(5000); //NOSONAR this method is being called in a while loop + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicyFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicyFactory.java new file mode 100644 index 0000000..b31d245 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicyFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +public class InfiniteRetryPolicyFactory implements IRetryPolicyFactory { + + @Override + public IRetryPolicy create(IActiveEntityEventsListener listener) { + return new InfiniteRetryPolicy(listener); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java new file mode 100644 index 0000000..1596c17 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +public class NoRetryPolicyFactory implements IRetryPolicyFactory { + public static final NoRetryPolicyFactory INSTANCE = new NoRetryPolicyFactory(); + private static final IRetryPolicy policy = () -> false; + + private NoRetryPolicyFactory() { + } + + @Override + public IRetryPolicy create(IActiveEntityEventsListener listener) { + return policy; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java new file mode 100644 index 0000000..0a36216 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public abstract class SingleThreadEventProcessor implements Runnable { + + private static final Logger LOGGER = Logger.getLogger(SingleThreadEventProcessor.class.getName()); + private final String name; + private final LinkedBlockingQueue eventInbox; + private final ExecutorService executorService; + private final Future future; + + public SingleThreadEventProcessor(String threadName) { + this.name = threadName; + eventInbox = new LinkedBlockingQueue<>(); + executorService = Executors.newSingleThreadExecutor(r -> new Thread(r, threadName)); + future = executorService.submit(this); + } + + @Override + public final void run() { + LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName()); + while (!Thread.currentThread().isInterrupted()) { + try { + T event = eventInbox.take(); + handle(event); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Error handling an event", e); + } + } + LOGGER.log(Level.WARNING, "Stopped " + Thread.currentThread().getName()); + } + + protected abstract void handle(T event) throws Exception; //NOSONAR + + public void add(T event) { + if (!eventInbox.add(event)) { + throw new IllegalStateException(); + } + } + + public void stop() throws HyracksDataException, InterruptedException { + future.cancel(true); + executorService.shutdown(); + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + throw HyracksDataException.create(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java index 9391044..a47d5a5 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java @@ -21,8 +21,8 @@ package org.apache.asterix.active.message; import java.io.Serializable; import java.util.Objects; -import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.active.ActiveRuntimeId; +import org.apache.asterix.active.IActiveNotificationHandler; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -64,7 +64,7 @@ public class ActivePartitionMessage implements ICcAddressedMessage { @Override public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); + IActiveNotificationHandler activeListener = (IActiveNotificationHandler) appCtx.getActiveNotificationHandler(); activeListener.receive(this); } @@ -87,7 +87,7 @@ public class ActivePartitionMessage implements ICcAddressedMessage { return true; } ActivePartitionMessage other = (ActivePartitionMessage) o; - return Objects.equals(other.activeRuntimeId, activeRuntimeId) && Objects.equals(other.jobId, jobId) && Objects - .equals(other.payload, payload); + return Objects.equals(other.activeRuntimeId, activeRuntimeId) && Objects.equals(other.jobId, jobId) + && Objects.equals(other.payload, payload); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java index 213c60b..cd0a63c 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.algebra.operators.physical; -import org.apache.asterix.metadata.MetadataException; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.declared.DataSourceId; import org.apache.asterix.metadata.declared.MetadataProvider; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java index eee6bdc..1d47095 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java @@ -25,6 +25,8 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.lang.common.base.Statement; @@ -51,11 +53,13 @@ public abstract class AbstractLangTranslator { public void validateOperation(ICcApplicationContext appCtx, Dataverse defaultDataverse, Statement stmt) throws AsterixException { - if (!(ClusterStateManager.INSTANCE.getState().equals(ClusterState.ACTIVE) - && ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted())) { + final IClusterStateManager clusterStateManager = ClusterStateManager.INSTANCE; + final IGlobalRecoveryManager globalRecoveryManager = appCtx.getGlobalRecoveryManager(); + if (!(clusterStateManager.getState().equals(ClusterState.ACTIVE) + && globalRecoveryManager.isRecoveryCompleted())) { int maxWaitCycles = appCtx.getExternalProperties().getMaxWaitClusterActive(); try { - ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE, maxWaitCycles, TimeUnit.SECONDS); + clusterStateManager.waitForState(ClusterState.ACTIVE, maxWaitCycles, TimeUnit.SECONDS); } catch (HyracksDataException e) { throw new AsterixException(e); } catch (InterruptedException e) { @@ -64,7 +68,7 @@ public abstract class AbstractLangTranslator { } Thread.currentThread().interrupt(); } - if (!ClusterStateManager.INSTANCE.getState().equals(ClusterState.ACTIVE)) { + if (!clusterStateManager.getState().equals(ClusterState.ACTIVE)) { throw new AsterixException("Cluster is in " + ClusterState.UNUSABLE + " state." + "\n One or more Node Controllers have left or haven't joined yet.\n"); } else { @@ -74,16 +78,16 @@ public abstract class AbstractLangTranslator { } } - if (ClusterStateManager.INSTANCE.getState().equals(ClusterState.UNUSABLE)) { + if (clusterStateManager.getState().equals(ClusterState.UNUSABLE)) { throw new AsterixException("Cluster is in " + ClusterState.UNUSABLE + " state." + "\n One or more Node Controllers have left.\n"); } - if (!ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted()) { + if (!globalRecoveryManager.isRecoveryCompleted()) { int maxWaitCycles = appCtx.getExternalProperties().getMaxWaitClusterActive(); int waitCycleCount = 0; try { - while (!ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted() && waitCycleCount < maxWaitCycles) { + while (!globalRecoveryManager.isRecoveryCompleted() && waitCycleCount < maxWaitCycles) { Thread.sleep(1000); waitCycleCount++; } @@ -93,7 +97,7 @@ public abstract class AbstractLangTranslator { } Thread.currentThread().interrupt(); } - if (!ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted()) { + if (!globalRecoveryManager.isRecoveryCompleted()) { throw new AsterixException("Cluster Global recovery is not yet complete and the system is in " + ClusterState.ACTIVE + " state"); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java index b8a6d8a..a1c5cf4 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java @@ -36,6 +36,7 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.functions.FunctionConstants; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.external.util.ExternalDataUtils; @@ -72,7 +73,6 @@ import org.apache.asterix.lang.common.struct.OperatorType; import org.apache.asterix.lang.common.struct.QuantifiedPair; import org.apache.asterix.lang.common.util.FunctionUtil; import org.apache.asterix.lang.common.visitor.base.AbstractQueryExpressionVisitor; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.declared.DataSource; import org.apache.asterix.metadata.declared.DataSourceId; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java index 593faa6..15f0ace 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java @@ -18,15 +18,13 @@ */ package org.apache.asterix.api.http.server; -import java.io.IOException; import java.io.PrintWriter; import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; import java.util.logging.Logger; -import io.netty.handler.codec.http.HttpResponseStatus; -import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.active.IActiveEntityEventsListener; +import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.feed.watch.StatsSubscriber; import org.apache.hyracks.http.api.IServletRequest; @@ -38,19 +36,21 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.netty.handler.codec.http.HttpResponseStatus; + public class ActiveStatsApiServlet extends AbstractServlet { private static final Logger LOGGER = Logger.getLogger(ActiveStatsApiServlet.class.getName()); private static final int DEFAULT_EXPIRE_TIME = 2000; - private final ActiveLifecycleListener activeLifecycleListener; + private final ActiveNotificationHandler activeNotificationHandler; public ActiveStatsApiServlet(ConcurrentMap ctx, String[] paths, ICcApplicationContext appCtx) { super(ctx, paths); - this.activeLifecycleListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); + this.activeNotificationHandler = (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); } private JsonNode constructNode(ObjectMapper om, IActiveEntityEventsListener eventListener, long currentTime, - long ttl) throws InterruptedException, IOException { + long ttl) throws Exception { long statsTimeStamp = eventListener.getStatsTimeStamp(); if (currentTime - statsTimeStamp > ttl) { StatsSubscriber subscriber = new StatsSubscriber(eventListener); @@ -66,7 +66,7 @@ public class ActiveStatsApiServlet extends AbstractServlet { // Obtain all feed status String localPath = localPath(request); int expireTime; - IActiveEntityEventsListener[] listeners = activeLifecycleListener.getNotificationHandler().getEventListeners(); + IActiveEntityEventsListener[] listeners = activeNotificationHandler.getEventListeners(); ObjectMapper om = new ObjectMapper(); om.enable(SerializationFeature.INDENT_OUTPUT); ObjectNode resNode = om.createObjectNode(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java index 03958ed..d9a63f7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java @@ -29,7 +29,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -93,7 +92,7 @@ public class ConnectorApiServlet extends AbstractServlet { MetadataManager.INSTANCE.init(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); // Retrieves file splits of the dataset. - MetadataProvider metadataProvider = new MetadataProvider(appCtx, null, new StorageComponentProvider()); + MetadataProvider metadataProvider = new MetadataProvider(appCtx, null); try { metadataProvider.setMetadataTxnContext(mdTxnCtx); Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java index e9d231a..27e2806 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java @@ -36,8 +36,9 @@ import java.util.concurrent.Future; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.app.active.ActiveNotificationHandler; +import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -72,7 +73,7 @@ public class RebalanceApiServlet extends AbstractServlet { private final ExecutorService executor = Executors.newSingleThreadExecutor(); // A queue that maintains submitted rebalance requests. - private final Queue rebalanceTasks = new ArrayDeque<>(); + private final Queue> rebalanceTasks = new ArrayDeque<>(); // A queue that tracks the termination of rebalance threads. private final Queue rebalanceFutureTerminated = new ArrayDeque<>(); @@ -141,7 +142,7 @@ public class RebalanceApiServlet extends AbstractServlet { // Cancels all rebalance tasks. private synchronized void cancelRebalance() throws InterruptedException { - for (Future rebalanceTask : rebalanceTasks) { + for (Future rebalanceTask : rebalanceTasks) { rebalanceTask.cancel(true); } } @@ -156,14 +157,15 @@ public class RebalanceApiServlet extends AbstractServlet { private synchronized CountDownLatch scheduleRebalance(String dataverseName, String datasetName, String[] targetNodes, IServletResponse response) { CountDownLatch terminated = new CountDownLatch(1); - Future task = executor.submit(() -> doRebalance(dataverseName, datasetName, targetNodes, response, terminated)); + Future task = + executor.submit(() -> doRebalance(dataverseName, datasetName, targetNodes, response, terminated)); rebalanceTasks.add(task); rebalanceFutureTerminated.add(terminated); return terminated; } // Performs the actual rebalance. - private void doRebalance(String dataverseName, String datasetName, String[] targetNodes, IServletResponse response, + private Void doRebalance(String dataverseName, String datasetName, String[] targetNodes, IServletResponse response, CountDownLatch terminated) { try { // Sets the content type. @@ -198,6 +200,7 @@ public class RebalanceApiServlet extends AbstractServlet { // Notify that the rebalance task is terminated. terminated.countDown(); } + return null; } // Lists all datasets that should be rebalanced in a given datavserse. @@ -241,9 +244,23 @@ public class RebalanceApiServlet extends AbstractServlet { // Rebalances a given dataset. private void rebalanceDataset(String dataverseName, String datasetName, String[] targetNodes) throws Exception { IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); - MetadataProvider metadataProvider = new MetadataProvider(appCtx, null, new StorageComponentProvider()); - RebalanceUtil.rebalance(dataverseName, datasetName, new LinkedHashSet<>(Arrays.asList(targetNodes)), - metadataProvider, hcc, NoOpDatasetRebalanceCallback.INSTANCE); + MetadataProvider metadataProvider = new MetadataProvider(appCtx, null); + try { + ActiveNotificationHandler activeNotificationHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + activeNotificationHandler.suspend(metadataProvider); + try { + IMetadataLockManager lockManager = appCtx.getMetadataLockManager(); + lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), + dataverseName + '.' + datasetName); + RebalanceUtil.rebalance(dataverseName, datasetName, new LinkedHashSet<>(Arrays.asList(targetNodes)), + metadataProvider, hcc, NoOpDatasetRebalanceCallback.INSTANCE); + } finally { + activeNotificationHandler.resume(metadataProvider); + } + } finally { + metadataProvider.getLocks().unlock(); + } } // Sends HTTP response to the request client.