Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AEFF2200CD0 for ; Tue, 11 Jul 2017 03:19:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AD330164B0C; Tue, 11 Jul 2017 01:19:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8D3B4164AFF for ; Tue, 11 Jul 2017 03:19:12 +0200 (CEST) Received: (qmail 29201 invoked by uid 500); 11 Jul 2017 01:19:11 -0000 Mailing-List: contact notifications-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list notifications@asterixdb.apache.org Received: (qmail 29192 invoked by uid 99); 11 Jul 2017 01:19:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Jul 2017 01:19:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id EC892194DE4 for ; Tue, 11 Jul 2017 01:19:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.92 X-Spam-Level: X-Spam-Status: No, score=0.92 tagged_above=-999 required=6.31 tests=[SPF_FAIL=0.919, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id d7CSt_sAsifU for ; Tue, 11 Jul 2017 01:18:55 +0000 (UTC) Received: from unhygienix.ics.uci.edu (unhygienix.ics.uci.edu [128.195.14.130]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 9BACB5FDBF for ; Tue, 11 Jul 2017 01:18:43 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by unhygienix.ics.uci.edu (Postfix) with ESMTP id F1F9A24084B; Mon, 10 Jul 2017 18:18:42 -0700 (PDT) Date: Mon, 10 Jul 2017 18:18:42 -0700 From: "abdullah alamoudi (Code Review)" Message-ID: Reply-To: bamousaa@gmail.com X-Gerrit-MessageType: newchange Subject: Change in asterixdb[master]: WIP X-Gerrit-Change-Id: Ifeac8c73e6bad39a13663b84a52121356e3c6b40 X-Gerrit-ChangeURL: X-Gerrit-Commit: 517ceee61b5bd58562487deeda4db29f430ef95f MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Content-Disposition: inline User-Agent: Gerrit/2.12.7 To: undisclosed-recipients:; archived-at: Tue, 11 Jul 2017 01:19:15 -0000 abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1875 Change subject: WIP ...................................................................... WIP Change-Id: Ifeac8c73e6bad39a13663b84a52121356e3c6b40 --- M asterixdb/asterix-active/pom.xml M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java D asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java R asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveNotificationHandler.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java R asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IAdapterFactory.java R asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IDataSourceAdapter.java A asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java A asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java A asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java R asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java 53 files changed, 775 insertions(+), 477 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/75/1875/1 diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml index 3dd24b6..fe5efbe 100644 --- a/asterixdb/asterix-active/pom.xml +++ b/asterixdb/asterix-active/pom.xml @@ -31,6 +31,11 @@ ${project.version} + org.apache.asterix + asterix-om + ${project.version} + + log4j log4j diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java index 1141912..434801a 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java @@ -30,7 +30,8 @@ JOB_FINISHED, PARTITION_EVENT, EXTENSION_EVENT, - STATS_UPDATED + STATS_UPDATED, + FALURE } private final JobId jobId; @@ -43,10 +44,6 @@ this.entityId = entityId; this.eventKind = eventKind; this.eventObject = eventObject; - } - - public ActiveEvent(JobId jobId, Kind eventKind, EntityId entityId) { - this(jobId, eventKind, entityId, null); } public JobId getJobId() { @@ -79,8 +76,8 @@ return true; } ActiveEvent other = (ActiveEvent) o; - return Objects.equals(entityId, other.entityId) && Objects.equals(eventKind, other.eventKind) && Objects - .equals(eventObject, other.eventObject); + return Objects.equals(entityId, other.entityId) && Objects.equals(eventKind, other.eventKind) + && Objects.equals(eventObject, other.eventObject); } @Override diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java deleted file mode 100644 index 86c3e7d..0000000 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.active; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.active.ActiveEvent.Kind; -import org.apache.asterix.active.message.ActivePartitionMessage; -import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.job.IJobLifecycleListener; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; - -public class ActiveLifecycleListener implements IJobLifecycleListener { - - private static final Logger LOGGER = Logger.getLogger(ActiveLifecycleListener.class.getName()); - - private final ActiveJobNotificationHandler notificationHandler; - private final LinkedBlockingQueue jobEventInbox; - private final ExecutorService executorService; - - public ActiveLifecycleListener() { - notificationHandler = new ActiveJobNotificationHandler(); - jobEventInbox = notificationHandler.getEventInbox(); - executorService = Executors.newSingleThreadExecutor(); - executorService.execute(notificationHandler); - } - - @Override - public synchronized void notifyJobStart(JobId jobId) throws HyracksException { - EntityId entityId = notificationHandler.getEntity(jobId); - if (entityId != null) { - jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId)); - } - } - - @Override - public synchronized void notifyJobFinish(JobId jobId) throws HyracksException { - EntityId entityId = notificationHandler.getEntity(jobId); - if (entityId != null) { - jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId)); - } else { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("NO NEED TO NOTIFY JOB FINISH!"); - } - } - } - - @Override - public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { - notificationHandler.notifyJobCreation(jobId, spec); - } - - public void receive(ActivePartitionMessage message) { - jobEventInbox.add(new ActiveEvent(message.getJobId(), Kind.PARTITION_EVENT, - message.getActiveRuntimeId().getEntityId(), message)); - } - - public void stop() { - executorService.shutdown(); - } - - public ActiveJobNotificationHandler getNotificationHandler() { - return notificationHandler; - } -} diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveNotificationHandler.java similarity index 65% rename from asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java rename to asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveNotificationHandler.java index d2b8a89..0853144 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveNotificationHandler.java @@ -19,50 +19,61 @@ package org.apache.asterix.active; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.active.ActiveEvent.Kind; +import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.IJobLifecycleListener; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; -public class ActiveJobNotificationHandler implements Runnable { - public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob"; - private static final Logger LOGGER = Logger.getLogger(ActiveJobNotificationHandler.class.getName()); +public class ActiveNotificationHandler implements IJobLifecycleListener, Runnable { + + private static final Logger LOGGER = Logger.getLogger(ActiveNotificationHandler.class.getName()); private static final boolean DEBUG = false; + public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob"; private final LinkedBlockingQueue eventInbox; private final Map entityEventListeners; - private final Map jobId2ActiveJobInfos; + private final Map jobId2EntityId; + private final ExecutorService executorService; - public ActiveJobNotificationHandler() { - this.eventInbox = new LinkedBlockingQueue<>(); - this.jobId2ActiveJobInfos = new HashMap<>(); - this.entityEventListeners = new HashMap<>(); + public ActiveNotificationHandler() { + eventInbox = new LinkedBlockingQueue<>(); + jobId2EntityId = new HashMap<>(); + entityEventListeners = new HashMap<>(); + executorService = Executors.newSingleThreadExecutor(); + executorService.execute(this); } @Override public void run() { - Thread.currentThread().setName(ActiveJobNotificationHandler.class.getSimpleName()); - LOGGER.log(Level.INFO, "Started " + ActiveJobNotificationHandler.class.getSimpleName()); + Thread.currentThread().setName(ActiveNotificationHandler.class.getSimpleName()); + LOGGER.log(Level.INFO, "Started " + ActiveNotificationHandler.class.getSimpleName()); while (!Thread.interrupted()) { try { - ActiveEvent event = getEventInbox().take(); - EntityId entityId = jobId2ActiveJobInfos.get(event.getJobId()); + ActiveEvent event = eventInbox.take(); + EntityId entityId = jobId2EntityId.get(event.getJobId()); if (entityId != null) { IActiveEntityEventsListener listener = entityEventListeners.get(entityId); LOGGER.log(Level.FINER, "Next event is of type " + event.getEventKind()); if (event.getEventKind() == Kind.JOB_FINISHED) { LOGGER.log(Level.FINER, "Removing the job"); - jobId2ActiveJobInfos.remove(event.getJobId()); + jobId2EntityId.remove(event.getJobId()); } if (listener != null) { LOGGER.log(Level.FINER, "Notifying the listener"); listener.notify(event); } - } else { LOGGER.log(Level.SEVERE, "Entity not found for received message for job " + event.getJobId()); } @@ -72,15 +83,40 @@ LOGGER.log(Level.SEVERE, "Error handling an active job event", e); } } - LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName()); + LOGGER.log(Level.INFO, "Stopped " + ActiveNotificationHandler.class.getSimpleName()); } - public synchronized void removeListener(IActiveEntityEventsListener listener) throws HyracksDataException { - LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore"); - unregisterListener(listener); + @Override + public synchronized void notifyJobStart(JobId jobId) throws HyracksException { + EntityId entityId = getEntity(jobId); + if (entityId != null) { + eventInbox.add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null)); + } } - public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) { + @Override + public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List exceptions) + throws HyracksException { + EntityId entityId = getEntity(jobId); + if (entityId != null) { + eventInbox.add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions))); + } else { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("NO NEED TO NOTIFY JOB FINISH!"); + } + } + } + + public void receive(ActivePartitionMessage message) { + eventInbox.add(new ActiveEvent(message.getJobId(), Kind.PARTITION_EVENT, + message.getActiveRuntimeId().getEntityId(), message)); + } + + public void stop() { + executorService.shutdown(); + } + + public IActiveEntityEventsListener getListener(EntityId entityId) { if (DEBUG) { LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId); IActiveEntityEventsListener listener = entityEventListeners.get(entityId); @@ -90,10 +126,11 @@ } public EntityId getEntity(JobId jobId) { - return jobId2ActiveJobInfos.get(jobId); + return jobId2EntityId.get(jobId); } - public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) { + @Override + public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException { LOGGER.log(Level.FINER, "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = " + jobId); Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME); @@ -103,18 +140,11 @@ } EntityId entityId = (EntityId) property; monitorJob(jobId, entityId); - boolean found = jobId2ActiveJobInfos.get(jobId) != null; + boolean found = jobId2EntityId.get(jobId) != null; LOGGER.log(Level.FINER, "Job was found to be: " + (found ? "Active" : "Inactive")); - IActiveEntityEventsListener listener = entityEventListeners.get(entityId); - if (listener != null) { - // It is okay to bypass the event inbox in this case because we know this is the first event for this entity - listener.notify(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification)); + if (!eventInbox.offer(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification))) { + throw new HyracksDataException("Full active event inbox"); } - LOGGER.log(Level.FINER, "Listener was notified" + jobId); - } - - public LinkedBlockingQueue getEventInbox() { - return eventInbox; } public synchronized IActiveEntityEventsListener[] getEventListeners() { @@ -147,14 +177,14 @@ } } - public synchronized void monitorJob(JobId jobId, EntityId activeJob) { + public synchronized void monitorJob(JobId jobId, EntityId entityId) { if (DEBUG) { LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId); - boolean found = jobId2ActiveJobInfos.get(jobId) != null; + boolean found = jobId2EntityId.get(jobId) != null; LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive")); } - if (entityEventListeners.containsKey(activeJob)) { - if (jobId2ActiveJobInfos.containsKey(jobId)) { + if (entityEventListeners.containsKey(entityId)) { + if (jobId2EntityId.containsKey(jobId)) { LOGGER.severe("Job is already being monitored for job: " + jobId); return; } @@ -162,8 +192,8 @@ LOGGER.log(Level.WARNING, "monitoring started for job id: " + jobId); } } else { - LOGGER.severe("No listener was found for the entity: " + activeJob); + LOGGER.severe("No listener was found for the entity: " + entityId); } - jobId2ActiveJobInfos.put(jobId, activeJob); + jobId2EntityId.put(jobId, entityId); } } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java index af8f5ca..1430eb8 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java @@ -20,27 +20,36 @@ public enum ActivityState { /** - * The initial state of an activity. - */ - CREATED, - /** - * The starting state and a possible terminal state. Next state can only be {@code ActivityState.STARTING} + * The starting state and a possible terminal state. Next state can only be {@code ActivityState.CREATED} */ STOPPED, /** - * A terminal state + * An unexpected failure caused the activity to stop */ FAILED, /** - * An intermediate state. Next state can only be {@code ActivityState.STARTED} or {@code ActivityState.FAILED} + * An intermediate state. Next state can only be {@code ActivityState.RUNNING} or {@code ActivityState.FAILED} */ STARTING, /** - * An intermediate state. Next state can only be {@code ActivityState.STOPPING} or {@code ActivityState.FAILED} + * An intermediate state. Next state can only be + * {@code ActivityState.STOPPING}, {@code ActivityState.SUSPENDING}, or {@code ActivityState.FAILED} */ - STARTED, + RUNNING, /** * An intermediate state. Next state can only be {@code ActivityState.STOPPED} or {@code ActivityState.FAILED} */ - STOPPING + STOPPING, + /** + * An intermediate state. Next state can only be {@code ActivityState.SUSPENDED} or {@code ActivityState.FAILED} + */ + SUSPENDING, + /** + * An intermediate state. Next state can only be {@code ActivityState.STOPPED} or {@code ActivityState.RESUMING} + */ + SUSPENDED, + /** + * An intermediate state. Next state can only be {@code ActivityState.RUNNING} or {@code ActivityState.FAILED} + */ + RESUMING } \ No newline at end of file diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java index 4bc02f3..9cbb2c1 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java @@ -79,4 +79,23 @@ * @throws HyracksDataException */ void refreshStats(long timeout) throws HyracksDataException; + + /** + * resume the activity + * + * @throws HyracksDataException + */ + void resume() throws HyracksDataException; + + /** + * indicates that a dataset is no longer being used by this active entity + * + * @param dataset + */ + void remove(IDataset dataset); + + /** + * @return true, if entity is active, false otherwise + */ + boolean isActive(); } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java index 69f7f1c..da7be77 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java @@ -43,14 +43,9 @@ /** * Wait until the terminal event has been received * - * @throws InterruptedException + * @throws Exception */ - void sync() throws InterruptedException; - - /** - * Stop watching events - */ - void unsubscribe(); + void sync() throws Exception; /** * callback upon successful subscription diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IAdapterFactory.java similarity index 98% rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java rename to asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IAdapterFactory.java index 40bc7d8..1344f5f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IAdapterFactory.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; +package org.apache.asterix.active; import java.io.Serializable; import java.util.Map; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IDataSourceAdapter.java similarity index 96% rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java rename to asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IDataSourceAdapter.java index 48df79b..349c7d7 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IDataSourceAdapter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; +package org.apache.asterix.active; import java.io.Serializable; @@ -29,6 +29,7 @@ * to be implemented by each adapter irrespective of the the kind of * adapter(pull or push). */ +@FunctionalInterface public interface IDataSourceAdapter extends Serializable { public enum AdapterType { @@ -38,6 +39,7 @@ /** * Triggers the adapter to begin ingesting data from the external source. + * * @param partition * The adapter could be running with a degree of parallelism. * partition corresponds to the i'th parallel instance. diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java new file mode 100644 index 0000000..a010984 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +@FunctionalInterface +public interface IRetryPolicy { + /** + * @return true if one more attempt should be done + */ + boolean retry(); +} diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java new file mode 100644 index 0000000..eae568c --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +@FunctionalInterface +public interface IRetryPolicyFactory { + /** + * @return an instance of retry policy + */ + IRetryPolicy create(); +} diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java new file mode 100644 index 0000000..af6be01 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +public class NoRetryPolicyFactory implements IRetryPolicyFactory { + public static final NoRetryPolicyFactory INSTANCE = new NoRetryPolicyFactory(); + private static final IRetryPolicy policy = () -> false; + + private NoRetryPolicyFactory() { + } + + @Override + public IRetryPolicy create() { + return policy; + } +} diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java index 9391044..e0388c2 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java @@ -21,7 +21,7 @@ import java.io.Serializable; import java.util.Objects; -import org.apache.asterix.active.ActiveLifecycleListener; +import org.apache.asterix.active.ActiveNotificationHandler; import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; @@ -64,7 +64,7 @@ @Override public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); + ActiveNotificationHandler activeListener = (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); activeListener.receive(this); } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java index 19f0dcc..02ea321 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.List; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.lang.common.statement.Query; @@ -164,4 +166,14 @@ */ String getActiveDataverseName(String dataverse); + /** + * @return the storage component provider + */ + IStorageComponentProvider getComponentProvider(); + + /** + * @return the application context + */ + ICcApplicationContext getAppCtx(); + } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java index 593faa6..ed8d430 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java @@ -18,14 +18,12 @@ */ package org.apache.asterix.api.http.server; -import java.io.IOException; import java.io.PrintWriter; import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; import java.util.logging.Logger; -import io.netty.handler.codec.http.HttpResponseStatus; -import org.apache.asterix.active.ActiveLifecycleListener; +import org.apache.asterix.active.ActiveNotificationHandler; import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.feed.watch.StatsSubscriber; @@ -38,19 +36,21 @@ import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.netty.handler.codec.http.HttpResponseStatus; + public class ActiveStatsApiServlet extends AbstractServlet { private static final Logger LOGGER = Logger.getLogger(ActiveStatsApiServlet.class.getName()); private static final int DEFAULT_EXPIRE_TIME = 2000; - private final ActiveLifecycleListener activeLifecycleListener; + private final ActiveNotificationHandler activeLifecycleListener; public ActiveStatsApiServlet(ConcurrentMap ctx, String[] paths, ICcApplicationContext appCtx) { super(ctx, paths); - this.activeLifecycleListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); + this.activeLifecycleListener = (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); } private JsonNode constructNode(ObjectMapper om, IActiveEntityEventsListener eventListener, long currentTime, - long ttl) throws InterruptedException, IOException { + long ttl) throws Exception { long statsTimeStamp = eventListener.getStatsTimeStamp(); if (currentTime - statsTimeStamp > ttl) { StatsSubscriber subscriber = new StatsSubscriber(eventListener); @@ -66,7 +66,7 @@ // Obtain all feed status String localPath = localPath(request); int expireTime; - IActiveEntityEventsListener[] listeners = activeLifecycleListener.getNotificationHandler().getEventListeners(); + IActiveEntityEventsListener[] listeners = activeLifecycleListener.getEventListeners(); ObjectMapper om = new ObjectMapper(); om.enable(SerializationFeature.INDENT_OUTPUT); ObjectNode resNode = om.createObjectNode(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java similarity index 65% rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 3216bfe..63ac5ab 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.feed.management; +package org.apache.asterix.app.active; import java.util.ArrayList; import java.util.Arrays; @@ -27,21 +27,24 @@ import org.apache.asterix.active.ActiveEvent; import org.apache.asterix.active.ActiveEvent.Kind; -import org.apache.asterix.active.ActiveLifecycleListener; +import org.apache.asterix.active.ActiveNotificationHandler; import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.active.IActiveEventSubscriber; +import org.apache.asterix.active.IRetryPolicyFactory; import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.active.message.StatsRequestMessage; -import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.metadata.IDataset; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -51,41 +54,53 @@ public class ActiveEntityEventsListener implements IActiveEntityEventsListener { private static final Logger LOGGER = Logger.getLogger(ActiveEntityEventsListener.class.getName()); - - enum RequestState { - INIT, - STARTED, - FINISHED - } - - // members - protected volatile ActivityState state; - protected JobId jobId; + // finals + protected final ActiveNotificationHandler handler; protected final List subscribers = new ArrayList<>(); - protected final ICcApplicationContext appCtx; + protected final IStatementExecutor statementExecutor; + protected final MetadataProvider metadataProvider; + protected final IHyracksClientConnection hcc; protected final EntityId entityId; - protected final List datasets; + protected final List datasets; protected final ActiveEvent statsUpdatedEvent; + protected final String runtimeName; + protected final IRetryPolicyFactory retryPolicyFactory; + // mutables + protected volatile ActivityState state; + protected AlgebricksAbsolutePartitionConstraint locations; + protected ActivityState prevState; + protected JobId jobId; protected long statsTimestamp; protected String stats; - protected RequestState statsRequestState; - protected final String runtimeName; - protected final AlgebricksAbsolutePartitionConstraint locations; + protected boolean isFetchingStats; protected int numRegistered; - public ActiveEntityEventsListener(ICcApplicationContext appCtx, EntityId entityId, List datasets, - AlgebricksAbsolutePartitionConstraint locations, String runtimeName) { - this.appCtx = appCtx; + public ActiveEntityEventsListener(IStatementExecutor statementExecutor, IHyracksClientConnection hcc, + EntityId entityId, List datasets, AlgebricksAbsolutePartitionConstraint locations, + String runtimeName, IRetryPolicyFactory retryPolicyFactory) throws HyracksDataException { + this.statementExecutor = statementExecutor; + this.metadataProvider = + new MetadataProvider(statementExecutor.getAppCtx(), null, statementExecutor.getComponentProvider()); + this.hcc = hcc; this.entityId = entityId; this.datasets = datasets; + this.retryPolicyFactory = retryPolicyFactory; this.state = ActivityState.STOPPED; this.statsTimestamp = -1; - this.statsRequestState = RequestState.INIT; - this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId); + this.isFetchingStats = false; + this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId, null); this.stats = "{\"Stats\":\"N/A\"}"; this.runtimeName = runtimeName; this.locations = locations; this.numRegistered = 0; + this.handler = + ((ActiveNotificationHandler) metadataProvider.getApplicationContext().getActiveNotificationHandler()); + handler.registerListener(this); + } + + protected void setState(ActivityState newState) { + this.prevState = state; + this.state = newState; } @Override @@ -95,13 +110,13 @@ ActiveEvent.Kind eventKind = event.getEventKind(); switch (eventKind) { case JOB_CREATED: - state = ActivityState.CREATED; + jobCreated(event); break; case JOB_STARTED: start(event); break; case JOB_FINISHED: - finish(); + finish(event); break; case PARTITION_EVENT: handle((ActivePartitionMessage) event.getEventObject()); @@ -116,26 +131,29 @@ } } + protected synchronized void jobCreated(ActiveEvent event) { + setState(ActivityState.STARTING); + } + protected synchronized void handle(ActivePartitionMessage message) { if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) { numRegistered++; if (numRegistered == locations.getLocations().length) { - state = ActivityState.STARTED; + setState(ActivityState.RUNNING); } } } - private void finish() throws Exception { - IHyracksClientConnection hcc = appCtx.getHcc(); - JobStatus status = hcc.getJobStatus(jobId); - state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED; - ActiveLifecycleListener activeLcListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - activeLcListener.getNotificationHandler().removeListener(this); + @SuppressWarnings("unchecked") + protected void finish(ActiveEvent event) throws HyracksDataException { + jobId = null; + Pair> status = (Pair>) event.getEventObject(); + setState(status.getLeft().equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED); } - private void start(ActiveEvent event) { + protected void start(ActiveEvent event) { this.jobId = event.getJobId(); - state = ActivityState.STARTING; + numRegistered = 0; } @Override @@ -161,7 +179,12 @@ @Override public boolean isEntityUsingDataset(IDataset dataset) { - return datasets.contains(dataset); + return isActive() && datasets.contains(dataset); + } + + @Override + public void remove(IDataset dataset) { + datasets.remove(dataset); } public JobId getJobId() { @@ -193,15 +216,16 @@ public void refreshStats(long timeout) throws HyracksDataException { LOGGER.log(Level.INFO, "refreshStats called"); synchronized (this) { - if (state != ActivityState.STARTED || statsRequestState == RequestState.STARTED) { - LOGGER.log(Level.INFO, "returning immediately since state = " + state + " and statsRequestState = " - + statsRequestState); + if (state != ActivityState.RUNNING || isFetchingStats) { + LOGGER.log(Level.INFO, + "returning immediately since state = " + state + " and fetchingStats = " + isFetchingStats); return; } else { - statsRequestState = RequestState.STARTED; + isFetchingStats = true; } } - ICCMessageBroker messageBroker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + ICCMessageBroker messageBroker = + (ICCMessageBroker) metadataProvider.getApplicationContext().getServiceContext().getMessageBroker(); long reqId = messageBroker.newRequestId(); List requests = new ArrayList<>(); List ncs = Arrays.asList(locations.getLocations()); @@ -217,8 +241,7 @@ } catch (Exception e) { throw HyracksDataException.create(e); } - // Same as above - statsRequestState = RequestState.FINISHED; + isFetchingStats = false; } protected synchronized void notifySubscribers(ActiveEvent event) { @@ -245,4 +268,29 @@ return locations; } + public void suspend() throws HyracksDataException { + throw new HyracksDataException("Unsupported"); + } + + @Override + public void resume() throws HyracksDataException { + throw new HyracksDataException("Unsupported"); + } + + @Override + public boolean isActive() { + return state != ActivityState.STOPPED && state != ActivityState.FAILED; + } + + public void unregister() throws HyracksDataException { + handler.unregisterListener(this); + } + + public void start(MetadataProvider metadataProvider) throws HyracksDataException { + throw new HyracksDataException("Unsupported"); + } + + public void stop(MetadataProvider metadataProvider) throws HyracksDataException { + throw new HyracksDataException("Unsupported"); + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java new file mode 100644 index 0000000..9dd1f54 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.active; + +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.active.ActiveNotificationHandler; +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IActiveEventSubscriber; +import org.apache.asterix.active.IRetryPolicyFactory; +import org.apache.asterix.common.metadata.IDataset; +import org.apache.asterix.common.utils.JobUtils; +import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; +import org.apache.asterix.lang.common.statement.StartFeedStatement; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.utils.FeedOperations; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobSpecification; + +public class FeedEventsListener extends ActiveEntityEventsListener { + + private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class.getName()); + private final Feed feed; + private final List feedConnections; + + public FeedEventsListener(IStatementExecutor statementExecutor, IHyracksClientConnection hcc, EntityId entityId, + List datasets, AlgebricksAbsolutePartitionConstraint locations, String runtimeName, + IRetryPolicyFactory retryPolicyFactory, Feed feed, final List feedConnections) + throws HyracksDataException { + super(statementExecutor, hcc, entityId, datasets, locations, runtimeName, retryPolicyFactory); + this.feed = feed; + this.feedConnections = feedConnections; + } + + @Override + public synchronized void remove(IDataset dataset) { + super.remove(dataset); + feedConnections.removeIf(o -> o.getDataverseName().equals(dataset.getDataverseName()) + && o.getDatasetName().equals(dataset.getDatasetName())); + } + + @Override + public synchronized void start(MetadataProvider metadataProvider) throws HyracksDataException { + if (state != ActivityState.FAILED && state != ActivityState.STOPPED) { + throw new HyracksDataException("Feed " + entityId.getEntityName() + " is started already."); + } + try { + setState(ActivityState.STARTING); + Pair jobInfo = + FeedOperations.buildStartFeedJob(statementExecutor, metadataProvider, feed, feedConnections, hcc); + JobSpecification feedJob = jobInfo.getLeft(); + IActiveEventSubscriber eventSubscriber = new WaitForStateSubscriber(this, ActivityState.RUNNING); + feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); + // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs. + // We will need to design general exception handling mechanism for feeds. + locations = jobInfo.getRight(); + JobUtils.runJob(hcc, feedJob, + Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION))); + eventSubscriber.sync(); + } catch (Exception e) { + setState(ActivityState.FAILED); + throw HyracksDataException.create(e); + } + LOGGER.log(Level.INFO, "Submitted"); + } + + @Override + public synchronized void stop(MetadataProvider metadataProvider) throws HyracksDataException { + try { + if (state == ActivityState.STOPPED) { + throw new HyracksDataException("Feed " + entityId.getEntityName() + " is not started."); + } + if (jobId == null) { + setState(ActivityState.STOPPED); + return; + } + IActiveEventSubscriber eventSubscriber = new WaitForStateSubscriber(this, ActivityState.STOPPED); + // Construct ActiveMessage + for (int i = 0; i < getLocations().getLocations().length; i++) { + String intakeLocation = getLocations().getLocations()[i]; + FeedOperations.SendStopMessageToNode(metadataProvider.getApplicationContext(), entityId, intakeLocation, + i); + } + eventSubscriber.sync(); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + public Feed getFeed() { + return feed; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java index 0e72976..bbfe680 100755 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java @@ -33,11 +33,11 @@ import javax.xml.bind.JAXBContext; import javax.xml.bind.Unmarshaller; +import org.apache.asterix.active.IDataSourceAdapter; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.asterix.external.dataset.adapter.AdapterIdentifier; import org.apache.asterix.external.library.ExternalLibrary; import org.apache.asterix.external.library.LibraryAdapter; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 2967a38..0f3b005 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -41,17 +41,18 @@ import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.ActiveLifecycleListener; +import org.apache.asterix.active.ActiveNotificationHandler; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveEntityEventsListener; -import org.apache.asterix.active.IActiveEventSubscriber; +import org.apache.asterix.active.NoRetryPolicyFactory; import org.apache.asterix.algebra.extension.IExtensionStatement; import org.apache.asterix.api.common.APIFramework; import org.apache.asterix.api.http.server.AbstractQueryApiServlet; import org.apache.asterix.api.http.server.ApiServlet; import org.apache.asterix.api.http.server.ResultUtil; +import org.apache.asterix.app.active.ActiveEntityEventsListener; +import org.apache.asterix.app.active.FeedEventsListener; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.config.ClusterProperties; @@ -71,16 +72,12 @@ import org.apache.asterix.common.metadata.IDataset; import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.common.utils.JobUtils.ProgressState; -import org.apache.asterix.compiler.provider.AqlCompilationProvider; import org.apache.asterix.compiler.provider.ILangCompilationProvider; -import org.apache.asterix.external.feed.management.ActiveEntityEventsListener; import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.formats.nontagged.TypeTraitProvider; import org.apache.asterix.lang.common.base.IReturningStatement; import org.apache.asterix.lang.common.base.IRewriterFactory; @@ -178,7 +175,6 @@ import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.commons.lang3.tuple.Triple; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind; @@ -210,7 +206,7 @@ public static final boolean IS_DEBUG_MODE = false;// true protected final List statements; - protected final ICcApplicationContext appCtx; + private final ICcApplicationContext appCtx; protected final SessionOutput sessionOutput; protected final SessionConfig sessionConfig; protected Dataverse activeDataverse; @@ -693,8 +689,8 @@ protected static void validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset) throws CompilationException { StringBuilder builder = null; - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners(); for (IActiveEntityEventsListener listener : listeners) { if (listener.isEntityUsingDataset(dataset)) { @@ -1169,26 +1165,31 @@ throw new AlgebricksException("There is no dataverse with this name " + dataverseName + "."); } } + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + bActiveTxn = false; // # disconnect all feeds from any datasets in the dataverse. - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); IActiveEntityEventsListener[] activeListeners = activeEventHandler.getEventListeners(); - Identifier dvId = new Identifier(dataverseName); - MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(), - metadataProvider.getStorageComponentProvider()); - tempMdProvider.setConfig(metadataProvider.getConfig()); for (IActiveEntityEventsListener listener : activeListeners) { EntityId activeEntityId = listener.getEntityId(); if (activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME) && activeEntityId.getDataverse().equals(dataverseName)) { - tempMdProvider.getLocks().reset(); - stopFeedBeforeDelete(new Pair<>(dvId, new Identifier(activeEntityId.getEntityName())), - tempMdProvider); - // prepare job to remove feed log storage - jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(metadataProvider, - MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName()))); + if (listener.getState() != ActivityState.STOPPED) { + ((ActiveEntityEventsListener) listener).stop(metadataProvider); + } + FeedEventsListener feedListener = (FeedEventsListener) listener; + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + bActiveTxn = true; + metadataProvider.setMetadataTxnContext(mdTxnCtx); + doDropFeed(hcc, metadataProvider, feedListener.getFeed()); + MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext()); + bActiveTxn = false; } } + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + bActiveTxn = true; + metadataProvider.setMetadataTxnContext(mdTxnCtx); // #. prepare jobs which will drop corresponding datasets with indexes. List datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName); @@ -1295,20 +1296,6 @@ } } - protected void stopFeedBeforeDelete(Pair feedNameComp, MetadataProvider metadataProvider) { - StopFeedStatement disStmt = new StopFeedStatement(feedNameComp); - try { - handleStopFeedStatement(metadataProvider, disStmt); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Stopped feed " + feedNameComp.second.getValue()); - } - } catch (Exception exception) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Unable to stop feed " + feedNameComp.second.getValue() + exception); - } - } - } - public void handleDatasetDropStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) throws Exception { DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt; @@ -1403,8 +1390,8 @@ throw new AlgebricksException( "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName); } - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners(); StringBuilder builder = null; for (IActiveEntityEventsListener listener : listeners) { @@ -1968,32 +1955,35 @@ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); return; } - - EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); - ActiveEntityEventsListener listener = - (ActiveEntityEventsListener) activeEventHandler.getActiveEntityListener(feedId); - if (listener != null) { - throw new AlgebricksException("Feed " + feedId - + " is currently active and connected to the following dataset(s) \n" + listener.toString()); - } else { - JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider, - MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName())); - runJob(hcc, spec); - MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName); - } - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Removed feed " + feedId); - } + doDropFeed(hcc, metadataProvider, feed); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e) { abort(e, e, mdTxnCtx); throw e; } finally { metadataProvider.getLocks().unlock(); + } + } + + protected void doDropFeed(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Feed feed) + throws Exception { + MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); + EntityId feedId = feed.getFeedId(); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler.getListener(feedId); + if (listener != null && listener.getState() != ActivityState.STOPPED) { + throw new AlgebricksException("Feed " + feedId + + " is currently active and connected to the following dataset(s) \n" + listener.toString()); + } else if (listener != null) { + listener.unregister(); + } + JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider, + MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName())); + runJob(hcc, spec); + MetadataManager.INSTANCE.dropFeed(mdTxnCtx, feed.getDataverseName(), feed.getFeedName()); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Removed feed " + feedId); } } @@ -2029,56 +2019,41 @@ StartFeedStatement sfs = (StartFeedStatement) stmt; String dataverseName = getActiveDataverse(sfs.getDataverseName()); String feedName = sfs.getFeedName().getValue(); - // Transcation handler MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - // Runtime handler - EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); - // Feed & Feed Connections - Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, - metadataProvider.getMetadataTxnContext()); - List feedConnections = MetadataManager.INSTANCE - .getFeedConections(metadataProvider.getMetadataTxnContext(), dataverseName, feedName); - ILangCompilationProvider compilationProvider = new AqlCompilationProvider(); - IStorageComponentProvider storageComponentProvider = new StorageComponentProvider(); - DefaultStatementExecutorFactory qtFactory = new DefaultStatementExecutorFactory(); - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); - ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler - .getActiveEntityListener(entityId); - if (listener != null) { - throw new AlgebricksException("Feed " + feedName + " is started already."); - } - // Start + boolean committed = false; MetadataLockManager.INSTANCE.startFeedBegin(metadataProvider.getLocks(), dataverseName, - dataverseName + "." + feedName, feedConnections); + dataverseName + "." + feedName); try { - // Prepare policy - List datasets = new ArrayList<>(); - for (FeedConnection connection : feedConnections) { - Dataset ds = metadataProvider.findDataset(connection.getDataverseName(), connection.getDatasetName()); - datasets.add(ds); + metadataProvider.setMetadataTxnContext(mdTxnCtx); + // Runtime handler + EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); + // Feed & Feed Connections + Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, + metadataProvider.getMetadataTxnContext()); + List feedConnections = MetadataManager.INSTANCE + .getFeedConections(metadataProvider.getMetadataTxnContext(), dataverseName, feedName); + MetadataLockManager.INSTANCE.lockFeedConnections(metadataProvider.getLocks(), feedConnections); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler.getListener(entityId); + if (listener == null) { + // Prepare policy + List datasets = new ArrayList<>(); + for (FeedConnection connection : feedConnections) { + Dataset ds = + metadataProvider.findDataset(connection.getDataverseName(), connection.getDatasetName()); + datasets.add(ds); + } + listener = new FeedEventsListener(this, hcc, entityId, datasets, null, + FeedIntakeOperatorNodePushable.class.getSimpleName(), NoRetryPolicyFactory.INSTANCE, feed, + feedConnections); } - org.apache.commons.lang3.tuple.Pair jobInfo = - FeedOperations.buildStartFeedJob(sessionOutput, metadataProvider, feed, feedConnections, - compilationProvider, storageComponentProvider, qtFactory, hcc); - - JobSpecification feedJob = jobInfo.getLeft(); - listener = new ActiveEntityEventsListener(appCtx, entityId, datasets, jobInfo.getRight(), - FeedIntakeOperatorNodePushable.class.getSimpleName()); - activeEventHandler.registerListener(listener); - IActiveEventSubscriber eventSubscriber = new WaitForStateSubscriber(listener, ActivityState.STARTED); - feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); - // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs. - // We will need to design general exception handling mechanism for feeds. - JobUtils.runJob(hcc, feedJob, - Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION))); - eventSubscriber.sync(); - LOGGER.log(Level.INFO, "Submitted"); + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + committed = true; + listener.start(metadataProvider); } catch (Exception e) { - abort(e, e, mdTxnCtx); - if (listener != null) { - activeEventHandler.unregisterListener(listener); + if (!committed) { + abort(e, e, mdTxnCtx); } throw e; } finally { @@ -2090,32 +2065,18 @@ StopFeedStatement sfst = (StopFeedStatement) stmt; String dataverseName = getActiveDataverse(sfst.getDataverseName()); String feedName = sfst.getFeedName().getValue(); - EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); + EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); // Obtain runtime info from ActiveListener - ActiveEntityEventsListener listener = - (ActiveEntityEventsListener) activeEventHandler.getActiveEntityListener(feedId); + ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler.getListener(entityId); if (listener == null) { throw new AlgebricksException("Feed " + feedName + " is not started."); } - IActiveEventSubscriber eventSubscriber = new WaitForStateSubscriber(listener, ActivityState.STOPPED); - // Transaction - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.stopFeedBegin(metadataProvider.getLocks(), dataverseName, feedName); + MetadataLockManager.INSTANCE.stopFeedBegin(metadataProvider.getLocks(), entityId.getDataverse(), + entityId.getEntityName()); try { - // validate - FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, mdTxnCtx); - // Construct ActiveMessage - for (int i = 0; i < listener.getLocations().getLocations().length; i++) { - String intakeLocation = listener.getLocations().getLocations()[i]; - FeedOperations.SendStopMessageToNode(appCtx, feedId, intakeLocation, i); - } - eventSubscriber.sync(); - } catch (Exception e) { - abort(e, e, mdTxnCtx); - throw e; + listener.stop(metadataProvider); } finally { metadataProvider.getLocks().unlock(); } @@ -2131,10 +2092,9 @@ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); // Check whether feed is alive - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); - if (activeEventHandler - .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName)) != null) { + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + if (activeEventHandler.getListener(new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName)) != null) { throw new CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, feedName); } // Transaction handling @@ -2185,21 +2145,26 @@ String feedName = cfs.getFeedName().getValue(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); - // Check whether feed is alive - if (activeEventHandler - .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName)) != null) { - throw new CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, feedName); - } MetadataLockManager.INSTANCE.disconnectFeedBegin(metadataProvider.getLocks(), dataverseName, dataverseName + "." + datasetName, dataverseName + "." + cfs.getFeedName()); try { + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + // Check whether feed is alive + IActiveEntityEventsListener listener = + activeEventHandler.getListener(new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName)); + if (listener != null && listener.isActive()) { + throw new CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, feedName); + } FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, cfs.getDatasetName().getValue(), mdTxnCtx); FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx); FeedConnection fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(), dataverseName, feedName, datasetName); + Dataset ds = metadataProvider.findDataset(dataverseName, datasetName); + if (ds == null) { + throw new CompilationException("Dataset " + dataverseName + "." + datasetName + " doesn't exist"); + } if (fc == null) { throw new CompilationException("Feed " + feedName + " is currently not connected to " + cfs.getDatasetName().getValue() + ". Invalid operation!"); @@ -2211,6 +2176,9 @@ MetadataManager.INSTANCE.updateFunction(mdTxnCtx, function); } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + if (listener != null) { + listener.remove(ds); + } } catch (Exception e) { abort(e, e, mdTxnCtx); throw e; @@ -2952,4 +2920,14 @@ IStatementRewriter rewriter = rewriterFactory.createStatementRewriter(); rewriter.rewrite(stmt); } + + @Override + public IStorageComponentProvider getComponentProvider() { + return componentProvider; + } + + @Override + public ICcApplicationContext getAppCtx() { + return appCtx; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index cd9138a..c1def23 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -30,7 +30,7 @@ import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.active.ActiveLifecycleListener; +import org.apache.asterix.active.ActiveNotificationHandler; import org.apache.asterix.api.http.ctx.StatementExecutorContext; import org.apache.asterix.api.http.server.ApiServlet; import org.apache.asterix.api.http.server.ClusterApiServlet; @@ -134,7 +134,7 @@ statementExecutorCtx = new StatementExecutorContext(); appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, resourceIdManager, () -> MetadataManager.INSTANCE, GlobalRecoveryManager.instance(), ftStrategy, - new ActiveLifecycleListener(), componentProvider); + new ActiveNotificationHandler(), componentProvider); ClusterStateManager.INSTANCE.setCcAppCtx(appCtx); ccExtensionManager = new CCExtensionManager(getExtensions()); appCtx.setExtensionManager(ccExtensionManager); @@ -147,7 +147,7 @@ setAsterixStateProxy(AsterixStateProxy.registerRemoteObject(metadataProperties.getMetadataCallbackPort())); ccServiceCtx.setDistributedState(proxy); MetadataManager.initialize(proxy, metadataProperties); - ccServiceCtx.addJobLifecycleListener(appCtx.getActiveLifecycleListener()); + ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler()); // create event loop groups webManager = new WebManager(); @@ -178,7 +178,7 @@ @Override public void stop() throws Exception { - ((ActiveLifecycleListener) appCtx.getActiveLifecycleListener()).stop(); + ((ActiveNotificationHandler) appCtx.getActiveNotificationHandler()).stop(); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Stopping Asterix cluster controller"); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index 09c4983..df4825a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -30,9 +30,9 @@ import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IAdapterFactory; +import org.apache.asterix.active.IDataSourceAdapter.AdapterType; import org.apache.asterix.active.message.ActiveManagerMessage; -import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; -import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; import org.apache.asterix.common.exceptions.ACIDException; @@ -40,8 +40,6 @@ import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.compiler.provider.ILangCompilationProvider; -import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.management.FeedConnectionRequest; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; @@ -50,6 +48,9 @@ import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor; import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor; +import org.apache.asterix.external.provider.AdapterFactoryProvider; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement; @@ -57,19 +58,23 @@ import org.apache.asterix.lang.common.statement.DataverseDecl; import org.apache.asterix.lang.common.struct.Identifier; import org.apache.asterix.lang.common.util.FunctionUtil; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.DatasourceAdapter; import org.apache.asterix.metadata.entities.Feed; import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.feeds.FeedMetadataUtil; import org.apache.asterix.metadata.feeds.LocationConstraint; +import org.apache.asterix.metadata.utils.MetadataConstants; +import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.runtime.job.listener.MultiTransactionJobletEventListenerFactory; import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.runtime.utils.RuntimeUtils; import org.apache.asterix.translator.CompiledStatements; import org.apache.asterix.translator.IStatementExecutor; -import org.apache.asterix.translator.SessionOutput; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; @@ -109,6 +114,55 @@ public class FeedOperations { private FeedOperations() { + } + + public static AlgebricksAbsolutePartitionConstraint getFeedLocations(Feed feed, ICcApplicationContext appCtx, + MetadataTransactionContext mdTxnCtx) throws Exception { + return getFeedFactory(feed, appCtx, mdTxnCtx).getPartitionConstraint(); + } + + public static IAdapterFactory getFeedFactory(Feed feed, ICcApplicationContext appCtx, + MetadataTransactionContext mdTxnCtx) throws Exception { + String adapterName = feed.getAdapterName(); + Map configuration = feed.getAdapterConfiguration(); + ARecordType adapterOutputType = FeedMetadataUtil.getOutputType(feed, feed.getAdapterConfiguration(), + ExternalDataConstants.KEY_TYPE_NAME); + ARecordType metaType = + FeedMetadataUtil.getOutputType(feed, configuration, ExternalDataConstants.KEY_META_TYPE_NAME); + ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName()); + // Get adapter from metadata dataset + DatasourceAdapter adapterEntity = + MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName); + // Get adapter from metadata dataset + if (adapterEntity == null) { + adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName); + } + IAdapterFactory adapterFactory; + if (adapterEntity != null) { + AdapterType adapterType = adapterEntity.getType(); + String adapterFactoryClassname = adapterEntity.getClassname(); + switch (adapterType) { + case INTERNAL: + adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance(); + break; + case EXTERNAL: + String[] anameComponents = adapterName.split("#"); + String libraryName = anameComponents[0]; + ClassLoader cl = + appCtx.getLibraryManager().getLibraryClassLoader(feed.getDataverseName(), libraryName); + adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance(); + break; + default: + throw new AsterixException("Unknown Adapter type " + adapterType); + } + adapterFactory.setOutputType(adapterOutputType); + adapterFactory.setMetaType(metaType); + adapterFactory.configure(appCtx.getServiceContext(), configuration); + } else { + adapterFactory = AdapterFactoryProvider.getAdapterFactory(appCtx.getServiceContext(), adapterName, + configuration, adapterOutputType, metaType); + } + return adapterFactory; } private static Pair buildFeedIntakeJobSpec(Feed feed, @@ -151,9 +205,8 @@ return spec; } - private static JobSpecification getConnectionJob(SessionOutput sessionOutput, MetadataProvider metadataProvider, - FeedConnection feedConnection, String[] locations, ILangCompilationProvider compilationProvider, - IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory, + private static JobSpecification getConnectionJob(IStatementExecutor statementExecutor, + MetadataProvider metadataProvider, FeedConnection feedConnection, String[] locations, IHyracksClientConnection hcc) throws AlgebricksException, RemoteException, ACIDException { DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(feedConnection.getDataverseName())); FeedConnectionRequest fcr = @@ -164,8 +217,6 @@ List statements = new ArrayList<>(); statements.add(dataverseDecl); statements.add(subscribeStmt); - IStatementExecutor translator = qtFactory.create(metadataProvider.getApplicationContext(), statements, - sessionOutput, compilationProvider, storageComponentProvider); // configure the metadata provider metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE); metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + subscribeStmt.getPolicy()); @@ -174,7 +225,7 @@ CompiledStatements.CompiledSubscribeFeedStatement csfs = new CompiledStatements.CompiledSubscribeFeedStatement( subscribeStmt.getSubscriptionRequest(), subscribeStmt.getVarCounter()); - return translator.rewriteCompileQuery(hcc, metadataProvider, subscribeStmt.getQuery(), csfs); + return statementExecutor.rewriteCompileQuery(hcc, metadataProvider, subscribeStmt.getQuery(), csfs); } private static JobSpecification combineIntakeCollectJobs(MetadataProvider metadataProvider, Feed feed, @@ -272,8 +323,8 @@ } // make connections between operators - for (Entry, - Pair>> entry : subJob.getConnectorOperatorMap().entrySet()) { + for (Entry, Pair>> entry : subJob + .getConnectorOperatorMap().entrySet()) { ConnectorDescriptorId newId = connectorIdMapping.get(entry.getKey()); IConnectorDescriptor connDesc = jobSpec.getConnectorMap().get(newId); Pair leftOp = entry.getValue().getLeft(); @@ -356,10 +407,8 @@ } public static Pair buildStartFeedJob( - SessionOutput sessionOutput, MetadataProvider metadataProvider, Feed feed, - List feedConnections, ILangCompilationProvider compilationProvider, - IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory, - IHyracksClientConnection hcc) throws Exception { + IStatementExecutor statementExecutor, MetadataProvider metadataProvider, Feed feed, + List feedConnections, IHyracksClientConnection hcc) throws Exception { FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>()); // TODO: Change the default Datasource to use all possible partitions Pair intakeInfo = buildFeedIntakeJobSpec(feed, metadataProvider, fpa); @@ -371,8 +420,8 @@ String[] ingestionLocations = ingestionAdaptorFactory.getPartitionConstraint().getLocations(); // Add connection job for (FeedConnection feedConnection : feedConnections) { - JobSpecification connectionJob = getConnectionJob(sessionOutput, metadataProvider, feedConnection, - ingestionLocations, compilationProvider, storageComponentProvider, qtFactory, hcc); + JobSpecification connectionJob = + getConnectionJob(statementExecutor, metadataProvider, feedConnection, ingestionLocations, hcc); jobsList.add(connectionJob); } return Pair.of(combineIntakeCollectJobs(metadataProvider, feed, intakeJob, jobsList, feedConnections, diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java index 956d111..a9e269a 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -21,23 +21,28 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.ActiveLifecycleListener; +import org.apache.asterix.active.ActiveNotificationHandler; import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveRuntime; +import org.apache.asterix.active.NoRetryPolicyFactory; import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.algebra.base.ILangExtension.Language; +import org.apache.asterix.app.active.ActiveEntityEventsListener; +import org.apache.asterix.app.cc.CCExtensionManager; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.metadata.IDataset; -import org.apache.asterix.external.feed.management.ActiveEntityEventsListener; import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.test.runtime.ExecutionTestUtil; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.SessionOutput; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; @@ -71,8 +76,8 @@ String requestedStats; CcApplicationContext appCtx = (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); - ActiveLifecycleListener activeLifecycleListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeJobNotificationHandler = activeLifecycleListener.getNotificationHandler(); + ActiveNotificationHandler activeJobNotificationHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); JobId jobId = new JobId(1); // Mock ActiveRuntime @@ -82,13 +87,18 @@ // Mock JobSpecification JobSpecification jobSpec = Mockito.mock(JobSpecification.class); - Mockito.when(jobSpec.getProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)) - .thenReturn(entityId); + Mockito.when(jobSpec.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)).thenReturn(entityId); + // Mock MetadataProvider + CCExtensionManager extensionManager = (CCExtensionManager) appCtx.getExtensionManager(); + IStatementExecutor statementExecutor = extensionManager + .getStatementExecutorFactory(appCtx.getServiceContext().getControllerService().getExecutor()) + .create(appCtx, Collections.emptyList(), Mockito.mock(SessionOutput.class), + extensionManager.getCompilationProvider(Language.SQLPP), appCtx.getStorageComponentProvider()); // Add event listener - ActiveEntityEventsListener eventsListener = new ActiveEntityEventsListener(appCtx, entityId, datasetList, - partitionConstraint, FeedIntakeOperatorNodePushable.class.getSimpleName()); - activeJobNotificationHandler.registerListener(eventsListener); + ActiveEntityEventsListener eventsListener = + new ActiveEntityEventsListener(statementExecutor, null, entityId, datasetList, partitionConstraint, + FeedIntakeOperatorNodePushable.class.getSimpleName(), NoRetryPolicyFactory.INSTANCE); // Register mock runtime NCAppRuntimeContext nc1AppCtx = @@ -107,14 +117,14 @@ eventsListener.subscribe(startingSubscriber); // Update stats of created/started job without joined partition activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec); - activeLifecycleListener.notifyJobStart(jobId); + activeJobNotificationHandler.notifyJobStart(jobId); startingSubscriber.sync(); eventsListener.refreshStats(1000); requestedStats = eventsListener.getStats(); Assert.assertTrue(requestedStats.contains("N/A")); // Fake partition message and notify eventListener - WaitForStateSubscriber startedSubscriber = new WaitForStateSubscriber(eventsListener, ActivityState.STARTED); + WaitForStateSubscriber startedSubscriber = new WaitForStateSubscriber(eventsListener, ActivityState.RUNNING); eventsListener.subscribe(startedSubscriber); ActivePartitionMessage partitionMessage = new ActivePartitionMessage(activeRuntimeId, jobId, ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java index 249bd56..8e84782 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java @@ -63,9 +63,9 @@ IFaultToleranceStrategy getFaultToleranceStrategy(); /** - * @return the active lifecycle listener at Cluster controller + * @return the active notification handler at Cluster controller */ - IJobLifecycleListener getActiveLifecycleListener(); + IJobLifecycleListener getActiveNotificationHandler(); /** * @return a new instance of {@link IHyracksClientConnection} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java index a0e6e71..48507a7 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java @@ -32,4 +32,13 @@ */ int[] getPrimaryBloomFilterFields(); + /** + * @return the dataverse name + */ + String getDataverseName(); + + /** + * @return the dataset name + */ + String getDatasetName(); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java index 2eb81d4..f29a5e5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -23,13 +23,13 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.active.IAdapterFactory; +import org.apache.asterix.active.IDataSourceAdapter; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataFlowController; import org.apache.asterix.external.api.IDataParserFactory; -import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.asterix.external.api.IExternalDataSourceFactory; import org.apache.asterix.external.api.IIndexibleExternalDataSource; import org.apache.asterix.external.api.IIndexingAdapterFactory; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java index 37cc1cf..d142cba 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java @@ -20,6 +20,7 @@ import java.util.List; +import org.apache.asterix.active.IAdapterFactory; import org.apache.asterix.external.indexing.ExternalFile; public interface IIndexingAdapterFactory extends IAdapterFactory { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java index e6d81d3..8904a18 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.external.dataset.adapter; -import org.apache.asterix.external.api.IDataSourceAdapter; +import org.apache.asterix.active.IDataSourceAdapter; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java index 0681d71..f3c3726 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.external.dataset.adapter; +import org.apache.asterix.active.IDataSourceAdapter; import org.apache.asterix.external.api.IDataFlowController; -import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java index 822d725..d351c51 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java @@ -20,37 +20,41 @@ import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.active.IActiveEventSubscriber; -import org.apache.hyracks.api.exceptions.HyracksDataException; public abstract class AbstractSubscriber implements IActiveEventSubscriber { protected final IActiveEntityEventsListener listener; - private boolean done = false; + private volatile boolean done = false; + private volatile Exception failure = null; public AbstractSubscriber(IActiveEntityEventsListener listener) { this.listener = listener; } @Override - public synchronized boolean isDone() { + public boolean isDone() { return done; } - public synchronized void complete() throws HyracksDataException { - done = true; - notifyAll(); - } - - @Override - public synchronized void sync() throws InterruptedException { - while (!done) { - wait(); + public void complete(Exception failure) { + synchronized (listener) { + done = true; + if (failure != null) { + this.failure = failure; + } + listener.notifyAll(); } } @Override - public synchronized void unsubscribe() { - done = true; - notifyAll(); + public void sync() throws Exception { + synchronized (listener) { + while (!done) { + if (failure != null) { + throw failure; + } + listener.wait(); + } + } } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java index 42f7a74..90cf45f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java @@ -49,11 +49,6 @@ } @Override - public void unsubscribe() { - // no op - } - - @Override public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException { // no op } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java index fa2fa7f..804fbd8 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java @@ -32,7 +32,17 @@ @Override public void notify(ActiveEvent event) throws HyracksDataException { if (event.getEventKind() == ActiveEvent.Kind.STATS_UPDATED) { - complete(); + try { + complete(null); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } else if (event.getEventKind() == ActiveEvent.Kind.FALURE) { + try { + complete((Exception) event.getEventObject()); + } catch (Exception e) { + throw HyracksDataException.create(e); + } } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java index 7bab421..6ae62ae 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java @@ -39,7 +39,13 @@ @Override public void notify(ActiveEvent event) throws HyracksDataException { if (listener.getState() == targetState) { - complete(); + complete(null); + } else if (event.getEventKind() == ActiveEvent.Kind.FALURE) { + try { + complete((Exception) event.getEventObject()); + } catch (Exception e) { + throw HyracksDataException.create(e); + } } } @@ -49,7 +55,7 @@ throw new RuntimeDataException(ErrorCode.CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY); } if (listener.getState() == targetState) { - complete(); + complete(null); } } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java index 93acb26..7ff429d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.external.operators; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.api.IDataSourceAdapter; +import org.apache.asterix.active.IAdapterFactory; +import org.apache.asterix.active.IDataSourceAdapter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java index 3a06a2b..f38a005 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java @@ -22,11 +22,11 @@ import java.util.logging.Logger; import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IAdapterFactory; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.feed.api.IFeed; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.om.types.ARecordType; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java index 8c6a420..4ece727 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java @@ -21,9 +21,9 @@ import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.ActiveSourceOperatorNodePushable; import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IAdapterFactory; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.dataset.adapter.FeedAdapter; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java index d6ac5d1..aecb790 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java @@ -21,9 +21,9 @@ import java.util.List; import java.util.Map; +import org.apache.asterix.active.IAdapterFactory; import org.apache.asterix.external.adapter.factory.GenericAdapterFactory; import org.apache.asterix.external.adapter.factory.LookupAdapterFactory; -import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IIndexingAdapterFactory; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.util.ExternalDataCompatibilityUtils; diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java index 1c28940..72efc95 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java @@ -22,10 +22,10 @@ import java.io.InputStream; import java.util.Map; +import org.apache.asterix.active.IAdapterFactory; +import org.apache.asterix.active.IDataSourceAdapter; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.asterix.external.api.IExternalDataSourceFactory; import org.apache.asterix.external.api.ITupleForwarder; import org.apache.asterix.external.parser.ADMDataParser; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java index c84a5bd..08ca03b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java @@ -57,8 +57,7 @@ // Key is dataverse name. Key of value map is dataset name. protected final Map> datasets = new HashMap<>(); // Key is dataverse name. Key of value map is dataset name. Key of value map of value map is index name. - protected final Map>> indexes = - new HashMap<>(); + protected final Map>> indexes = new HashMap<>(); // Key is dataverse name. Key of value map is datatype name. protected final Map> datatypes = new HashMap<>(); // Key is dataverse name. @@ -66,19 +65,16 @@ // Key is function Identifier . Key of value map is function name. protected final Map functions = new HashMap<>(); // Key is adapter dataverse. Key of value map is the adapter name - protected final Map> adapters = - new HashMap<>(); + protected final Map> adapters = new HashMap<>(); // Key is DataverseName, Key of the value map is the Policy name - protected final Map> feedPolicies = - new HashMap<>(); + protected final Map> feedPolicies = new HashMap<>(); // Key is library dataverse. Key of value map is the library name protected final Map> libraries = new HashMap<>(); // Key is library dataverse. Key of value map is the feed name protected final Map> feeds = new HashMap<>(); // Key is DataverseName, Key of the value map is the Policy name - protected final Map> compactionPolicies = - new HashMap<>(); + protected final Map> compactionPolicies = new HashMap<>(); // Key is DataverseName, Key of value map is feedConnectionId protected final Map> feedConnections = new HashMap<>(); @@ -247,8 +243,7 @@ datatypes.remove(dataverse.getDataverseName()); adapters.remove(dataverse.getDataverseName()); compactionPolicies.remove(dataverse.getDataverseName()); - List markedFunctionsForRemoval = - new ArrayList<>(); + List markedFunctionsForRemoval = new ArrayList<>(); for (FunctionSignature signature : functions.keySet()) { if (signature.getNamespace().equals(dataverse.getDataverseName())) { markedFunctionsForRemoval.add(signature); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index 93b19f1..b35fff7 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -26,6 +26,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.active.IAdapterFactory; +import org.apache.asterix.active.IDataSourceAdapter; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.ClusterProperties; @@ -39,8 +41,6 @@ import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.external.adapter.factory.GenericAdapterFactory; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.asterix.external.dataset.adapter.AdapterIdentifier; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.metadata.IDatasetDetails; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java index e2b1761..e626892 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java @@ -20,8 +20,8 @@ import java.util.List; +import org.apache.asterix.active.IAdapterFactory; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.metadata.IDatasetDetails; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.entities.Dataset; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java index b13f4c2..18799a5 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java @@ -23,8 +23,8 @@ import java.util.List; import java.util.Map; +import org.apache.asterix.active.IAdapterFactory; import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.om.types.ARecordType; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 3b70ea9..9960645 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -28,6 +28,8 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; import org.apache.asterix.common.config.DatasetConfig.IndexType; +import org.apache.asterix.active.IAdapterFactory; +import org.apache.asterix.active.IDataSourceAdapter; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.context.IStorageComponentProvider; @@ -38,8 +40,6 @@ import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.external.adapter.factory.LookupAdapterFactory; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.indexing.IndexingConstants; @@ -397,23 +397,23 @@ } public Triple buildFeedIntakeRuntime( - JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception { + JobSpecification jobSpec, Feed feed, FeedPolicyAccessor policyAccessor) throws Exception { Triple factoryOutput; - factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx, + factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(feed, policyAccessor, mdTxnCtx, getApplicationContext()); - ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, primaryFeed.getAdapterConfiguration(), + ARecordType recordType = FeedMetadataUtil.getOutputType(feed, feed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME); IAdapterFactory adapterFactory = factoryOutput.first; FeedIntakeOperatorDescriptor feedIngestor = null; switch (factoryOutput.third) { case INTERNAL: - feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, adapterFactory, recordType, + feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, adapterFactory, recordType, policyAccessor, factoryOutput.second); break; case EXTERNAL: - String libraryName = primaryFeed.getAdapterName().trim() + String libraryName = feed.getAdapterName().trim() .split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0]; - feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, libraryName, + feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, libraryName, adapterFactory.getClass().getName(), recordType, policyAccessor, factoryOutput.second); break; default: diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index 9131692..2383eb6 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -27,7 +27,7 @@ import java.util.logging.Logger; import java.util.stream.IntStream; -import org.apache.asterix.active.ActiveLifecycleListener; +import org.apache.asterix.active.ActiveNotificationHandler; import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory; @@ -179,8 +179,8 @@ public Dataset(Dataset dataset, boolean forRebalance, String targetNodeGroupName) { this(dataset.dataverseName, dataset.datasetName, dataset.recordTypeDataverseName, dataset.recordTypeName, dataset.metaTypeDataverseName, dataset.metaTypeName, targetNodeGroupName, - dataset.compactionPolicyFactory, - dataset.compactionPolicyProperties, dataset.datasetDetails, dataset.hints, dataset.datasetType, + dataset.compactionPolicyFactory, dataset.compactionPolicyProperties, dataset.datasetDetails, + dataset.hints, dataset.datasetType, forRebalance ? DatasetIdFactory.generateAlternatingDatasetId(dataset.datasetId) : dataset.datasetId, dataset.pendingOp, forRebalance ? dataset.rebalanceCount + 1 : dataset.rebalanceCount); } @@ -206,10 +206,12 @@ this.rebalanceCount = rebalanceCount; } + @Override public String getDataverseName() { return dataverseName; } + @Override public String getDatasetName() { return datasetName; } @@ -325,9 +327,9 @@ Map> disconnectJobList = new HashMap<>(); if (getDatasetType() == DatasetType.INTERNAL) { // prepare job spec(s) that would disconnect any active feeds involving the dataset. - ActiveLifecycleListener activeListener = - (ActiveLifecycleListener) metadataProvider.getApplicationContext().getActiveLifecycleListener(); - IActiveEntityEventsListener[] activeListeners = activeListener.getNotificationHandler().getEventListeners(); + ActiveNotificationHandler activeListener = + (ActiveNotificationHandler) metadataProvider.getApplicationContext().getActiveNotificationHandler(); + IActiveEntityEventsListener[] activeListeners = activeListener.getEventListeners(); for (IActiveEntityEventsListener listener : activeListeners) { if (listener.isEntityUsingDataset(this)) { throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET, @@ -706,8 +708,8 @@ public RecordDescriptor getPrimaryRecordDescriptor(MetadataProvider metadataProvider) throws AlgebricksException { List> partitioningKeys = getPrimaryKeys(); int numPrimaryKeys = partitioningKeys.size(); - ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1 - + (hasMetaPart() ? 1 : 0)]; + ISerializerDeserializer[] primaryRecFields = + new ISerializerDeserializer[numPrimaryKeys + 1 + (hasMetaPart() ? 1 : 0)]; ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + (hasMetaPart() ? 1 : 0)]; ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider(); List indicators = null; @@ -719,9 +721,9 @@ // Set the serde/traits for primary keys for (int i = 0; i < numPrimaryKeys; i++) { - IAType keyType = (indicators == null || indicators.get(i) == 0) - ? itemType.getSubFieldType(partitioningKeys.get(i)) - : metaType.getSubFieldType(partitioningKeys.get(i)); + IAType keyType = + (indicators == null || indicators.get(i) == 0) ? itemType.getSubFieldType(partitioningKeys.get(i)) + : metaType.getSubFieldType(partitioningKeys.get(i)); primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType); primaryTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); } @@ -731,8 +733,8 @@ primaryTypeTraits[numPrimaryKeys] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); if (hasMetaPart()) { // Set the serde and traits for the meta record field - primaryRecFields[numPrimaryKeys + 1] = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(metaType); + primaryRecFields[numPrimaryKeys + 1] = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType); primaryTypeTraits[numPrimaryKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); } return new RecordDescriptor(primaryRecFields, primaryTypeTraits); @@ -758,9 +760,9 @@ indicators = ((InternalDatasetDetails) getDatasetDetails()).getKeySourceIndicator(); } for (int i = 0; i < numPrimaryKeys; i++) { - IAType keyType = (indicators == null || indicators.get(i) == 0) - ? recordType.getSubFieldType(partitioningKeys.get(i)) - : metaType.getSubFieldType(partitioningKeys.get(i)); + IAType keyType = + (indicators == null || indicators.get(i) == 0) ? recordType.getSubFieldType(partitioningKeys.get(i)) + : metaType.getSubFieldType(partitioningKeys.get(i)); cmpFactories[i] = cmpFactoryProvider.getBinaryComparatorFactory(keyType, true); } return cmpFactories; @@ -803,8 +805,8 @@ // Gets an array of partition numbers for this dataset. protected int[] getDatasetPartitions(MetadataProvider metadataProvider) throws AlgebricksException { - FileSplit[] splitsForDataset = metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this, - getDatasetName()); + FileSplit[] splitsForDataset = + metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this, getDatasetName()); return IntStream.range(0, splitsForDataset.length).toArray(); } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java index 5a85327..f263703 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.metadata.entities; -import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType; +import org.apache.asterix.active.IDataSourceAdapter.AdapterType; import org.apache.asterix.external.dataset.adapter.AdapterIdentifier; import org.apache.asterix.metadata.MetadataCache; import org.apache.asterix.metadata.api.IMetadataEntity; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java index 7f4e28d..8a7a9b8 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java @@ -24,7 +24,7 @@ import java.io.DataInputStream; import java.util.Calendar; -import org.apache.asterix.external.api.IDataSourceAdapter; +import org.apache.asterix.active.IDataSourceAdapter; import org.apache.asterix.external.dataset.adapter.AdapterIdentifier; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.MetadataException; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java index 94b36ed..d1001030 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java @@ -21,15 +21,15 @@ import java.rmi.RemoteException; import java.util.Map; +import org.apache.asterix.active.IAdapterFactory; +import org.apache.asterix.active.IDataSourceAdapter; +import org.apache.asterix.active.IDataSourceAdapter.AdapterType; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.api.IDataSourceAdapter; -import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType; import org.apache.asterix.external.feed.api.IFeed; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.external.provider.AdapterFactoryProvider; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java index 43d72e3..393af7b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java @@ -266,13 +266,16 @@ acquireDataverseReadLock(locks, dataverseName); } - public void startFeedBegin(LockList locks, String dataverseName, String feedName, - List feedConnections) throws AsterixException { + public void startFeedBegin(LockList locks, String dataverseName, String feedName) throws AsterixException { acquireDataverseReadLock(locks, dataverseName); acquireFeedReadLock(locks, feedName); + + } + + public void lockFeedConnections(LockList locks, List feedConnections) throws AsterixException { for (FeedConnection feedConnection : feedConnections) { // what if the dataset is in a different dataverse - String fqName = dataverseName + "." + feedConnection.getDatasetName(); + String fqName = feedConnection.getDataverseName() + "." + feedConnection.getDatasetName(); acquireDatasetReadLock(locks, fqName); } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java index 5bb0aa9..fb655eb 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java @@ -28,11 +28,11 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.active.IAdapterFactory; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; import org.apache.asterix.common.config.DatasetConfig.TransactionState; import org.apache.asterix.common.context.IStorageComponentProvider; -import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor; diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java index 4cd243c..d43af8c 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java @@ -210,7 +210,7 @@ } @Override - public IJobLifecycleListener getActiveLifecycleListener() { + public IJobLifecycleListener getActiveNotificationHandler() { return activeLifeCycleListener; } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java index 30ffebe..1506872 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java @@ -18,12 +18,14 @@ */ package org.apache.hyracks.api.job; +import java.util.List; + import org.apache.hyracks.api.exceptions.HyracksException; public interface IJobLifecycleListener { - public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException; + void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException; - public void notifyJobStart(JobId jobId) throws HyracksException; + void notifyJobStart(JobId jobId) throws HyracksException; - public void notifyJobFinish(JobId jobId) throws HyracksException; + void notifyJobFinish(JobId jobId, JobStatus jobStatus, List exceptions) throws HyracksException; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java index 5075081..26245e1 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java @@ -36,6 +36,7 @@ import org.apache.hyracks.api.job.IJobLifecycleListener; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.application.ServiceContext; @@ -88,14 +89,14 @@ } } - public synchronized void notifyJobFinish(JobId jobId) throws HyracksException { + public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List exceptions) + throws HyracksException { for (IJobLifecycleListener l : jobLifecycleListeners) { - l.notifyJobFinish(jobId); + l.notifyJobFinish(jobId, jobStatus, exceptions); } } - public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) - throws HyracksException { + public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { for (IJobLifecycleListener l : jobLifecycleListeners) { l.notifyJobCreation(jobId, spec); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java index 3cc41c5..32403ce 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java @@ -41,6 +41,7 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.control.cc.PreDistributedJobStore; import org.apache.hyracks.control.common.dataset.ResultStateSweeper; import org.apache.hyracks.control.common.work.IResultCallback; @@ -69,7 +70,7 @@ this.resultTTL = resultTTL; this.resultSweepThreshold = resultSweepThreshold; this.preDistributedJobStore = preDistributedJobStore; - jobResultLocations = new LinkedHashMap(); + jobResultLocations = new LinkedHashMap<>(); } @Override @@ -94,7 +95,7 @@ } @Override - public void notifyJobFinish(JobId jobId) throws HyracksException { + public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List exceptions) throws HyracksException { // Auto-generated method stub } @@ -189,7 +190,7 @@ @Override public synchronized long getResultTimestamp(JobId jobId) { - if (preDistributedJobStore.jobIsPredistributed(jobId)){ + if (preDistributedJobStore.jobIsPredistributed(jobId)) { return -1; } return getState(jobId).getTimestamp(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 45c7711..c1a7899 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -132,8 +132,8 @@ // Removes a pending job. JobRun jobRun = jobQueue.remove(jobId); if (jobRun != null) { - List exceptions = Collections - .singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId)); + List exceptions = + Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId)); // Since the job has not been executed, we only need to update its status and lifecyle here. jobRun.setStatus(JobStatus.FAILURE, exceptions); runMapArchive.put(jobId, jobRun); @@ -179,7 +179,7 @@ } catch (Exception e) { LOGGER.log(Level.SEVERE, e.getMessage(), e); if (caughtException == null) { - caughtException = new HyracksException(e); + caughtException = HyracksException.create(e); } else { caughtException.addSuppressed(e); } @@ -208,7 +208,7 @@ CCServiceContext serviceCtx = ccs.getContext(); if (serviceCtx != null) { try { - serviceCtx.notifyJobFinish(jobId); + serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions()); } catch (HyracksException e) { LOGGER.log(Level.SEVERE, e.getMessage(), e); caughtException = e; @@ -248,8 +248,6 @@ throw caughtException; } } - - @Override public Collection getRunningJobs() { @@ -320,9 +318,8 @@ try { run.getExecutor().startJob(); } catch (Exception e) { - ccs.getWorkQueue() - .schedule(new JobCleanupWork(ccs.getJobManager(), run.getJobId(), JobStatus.FAILURE, - Collections.singletonList(e))); + ccs.getWorkQueue().schedule(new JobCleanupWork(ccs.getJobManager(), run.getJobId(), JobStatus.FAILURE, + Collections.singletonList(e))); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1875 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ifeac8c73e6bad39a13663b84a52121356e3c6b40 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi