Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8EAE1200CE1 for ; Thu, 27 Jul 2017 08:36:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8D49A16A4B2; Thu, 27 Jul 2017 06:36:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3568B16A4B3 for ; Thu, 27 Jul 2017 08:35:59 +0200 (CEST) Received: (qmail 49236 invoked by uid 500); 27 Jul 2017 06:35:57 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 49214 invoked by uid 99); 27 Jul 2017 06:35:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Jul 2017 06:35:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 84E1CF32CA; Thu, 27 Jul 2017 06:35:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.apache.org Date: Thu, 27 Jul 2017 06:35:59 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/9] asterixdb git commit: [ASTERIXDB-1992][ING] Suspend/Resume active entities archived-at: Thu, 27 Jul 2017 06:36:01 -0000 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java new file mode 100644 index 0000000..52d4225 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.active; + +import java.util.List; + +import org.apache.asterix.active.ActiveEvent; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.app.active.ActiveNotificationHandler; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; +import org.mockito.Mockito; + +public class TestClusterControllerActor extends Actor { + + private final ActiveNotificationHandler handler; + private final List allDatasets; + + public TestClusterControllerActor(String name, ActiveNotificationHandler handler, List allDatasets) { + super(name, null); + this.handler = handler; + this.allDatasets = allDatasets; + } + + public Action startActiveJob(JobId jobId, EntityId entityId) { + Action startJob = new Action() { + @Override + protected void doExecute(MetadataProvider actorMdProvider) throws Exception { + // succeed + JobSpecification jobSpecification = Mockito.mock(JobSpecification.class); + Mockito.when(jobSpecification.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)) + .thenReturn(entityId); + handler.notifyJobCreation(jobId, jobSpecification); + handler.notifyJobStart(jobId); + } + }; + add(startJob); + return startJob; + } + + public Action activeEvent(ActiveEvent event) { + Action delivery = new Action() { + @Override + protected void doExecute(MetadataProvider actorMdProvider) throws Exception { + handler.add(event); + } + }; + add(delivery); + return delivery; + } + + public Action jobFinish(JobId jobId, JobStatus jobStatus, List exceptions) { + Action delivery = new Action() { + @Override + protected void doExecute(MetadataProvider actorMdProvider) throws Exception { + handler.notifyJobFinish(jobId, jobStatus, exceptions); + } + }; + add(delivery); + return delivery; + } + + public List getAllDatasets() { + return allDatasets; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java new file mode 100644 index 0000000..d896995 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.active; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Semaphore; + +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IRetryPolicyFactory; +import org.apache.asterix.app.active.ActiveEntityEventsListener; +import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.metadata.LockList; +import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobIdFactory; +import org.apache.hyracks.api.job.JobStatus; + +public class TestEventsListener extends ActiveEntityEventsListener { + + public static enum Behavior { + SUCCEED, + RUNNING_JOB_FAIL, + FAIL_COMPILE, + FAIL_RUNTIME, + STEP_SUCCEED, + STEP_FAIL_COMPILE, + STEP_FAIL_RUNTIME + } + + private final Semaphore step = new Semaphore(0); + private final TestClusterControllerActor clusterController; + private final TestNodeControllerActor[] nodeControllers; + private final JobIdFactory jobIdFactory; + private Behavior onStart = Behavior.FAIL_COMPILE; + private Behavior onStop = Behavior.FAIL_COMPILE; + + public TestEventsListener(TestClusterControllerActor clusterController, TestNodeControllerActor[] nodeControllers, + JobIdFactory jobIdFactory, EntityId entityId, List datasets, IStatementExecutor statementExecutor, + ICcApplicationContext appCtx, IHyracksClientConnection hcc, AlgebricksAbsolutePartitionConstraint locations, + IRetryPolicyFactory retryPolicyFactory) throws HyracksDataException { + super(statementExecutor, appCtx, hcc, entityId, datasets, locations, TestEventsListener.class.getSimpleName(), + retryPolicyFactory); + this.clusterController = clusterController; + this.nodeControllers = nodeControllers; + this.jobIdFactory = jobIdFactory; + } + + public void allowStep() { + step.release(); + } + + private void step(Behavior behavior) throws HyracksDataException { + if (behavior == Behavior.STEP_FAIL_COMPILE || behavior == Behavior.STEP_FAIL_RUNTIME + || behavior == Behavior.STEP_SUCCEED) { + takeStep(); + } + } + + @SuppressWarnings("deprecation") + private void failCompile(Behavior behavior) throws HyracksDataException { + if (behavior == Behavior.FAIL_COMPILE || behavior == Behavior.STEP_FAIL_COMPILE) { + throw new HyracksDataException("Compilation Failure"); + } + } + + private synchronized void takeStep() throws HyracksDataException { + try { + while (!step.tryAcquire()) { + notifyAll(); + wait(10); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } + } + + @SuppressWarnings("deprecation") + @Override + protected void doStart(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException { + step(onStart); + failCompile(onStart); + JobId jobId = jobIdFactory.create(); + Action startJob = clusterController.startActiveJob(jobId, entityId); + try { + startJob.sync(); + } catch (InterruptedException e) { + throw HyracksDataException.create(e); + } + WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, + EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED, ActivityState.PERMANENTLY_FAILED)); + if (onStart == Behavior.FAIL_RUNTIME || onStart == Behavior.STEP_FAIL_RUNTIME) { + clusterController.jobFinish(jobId, JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("RuntimeFailure"))); + } else { + for (int i = 0; i < nodeControllers.length; i++) { + TestNodeControllerActor nodeController = nodeControllers[0]; + nodeController.registerRuntime(jobId, entityId, i); + } + } + try { + subscriber.sync(); + if (subscriber.getFailure() != null) { + throw HyracksDataException.create(subscriber.getFailure()); + } + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + @SuppressWarnings("deprecation") + @Override + protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException { + step(onStop); + failCompile(onStop); + try { + Set waitFor; + if (state == ActivityState.STOPPING) { + waitFor = EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED); + } else if (state == ActivityState.SUSPENDING) { + waitFor = EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED); + } else { + throw new IllegalStateException("stop with what intention??"); + } + WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, waitFor); + if (onStop == Behavior.RUNNING_JOB_FAIL) { + clusterController.jobFinish(jobId, JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("RuntimeFailure"))); + } else { + for (int i = 0; i < nodeControllers.length; i++) { + TestNodeControllerActor nodeController = nodeControllers[0]; + nodeController.deRegisterRuntime(jobId, entityId, i).sync(); + } + clusterController.jobFinish(jobId, JobStatus.TERMINATED, Collections.emptyList()); + } + subscriber.sync(); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + return null; + } + + public void onStart(Behavior behavior) { + this.onStart = behavior; + } + + public void onStop(Behavior behavior) { + this.onStop = behavior; + } + + @Override + protected void setRunning(MetadataProvider metadataProvider, boolean running) + throws HyracksDataException, AlgebricksException { + try { + IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); + LockList locks = metadataProvider.getLocks(); + lockManager.acquireDataverseReadLock(locks, entityId.getDataverse()); + lockManager.acquireActiveEntityWriteLock(locks, entityId.getDataverse() + '.' + entityId.getEntityName()); + // persist entity + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + @Override + protected Void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException { + return doStop(metadataProvider); + } + + @Override + protected void doResume(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException { + doStart(metadataProvider); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java new file mode 100644 index 0000000..e7e21b6 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.active; + +import org.apache.asterix.active.ActiveEvent; +import org.apache.asterix.active.ActiveEvent.Kind; +import org.apache.asterix.active.ActiveRuntimeId; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.hyracks.api.job.JobId; + +public class TestNodeControllerActor extends Actor { + + private final String id; + private final TestClusterControllerActor clusterController; + + public TestNodeControllerActor(String name, TestClusterControllerActor clusterController) { + super("NC: " + name, null); + this.id = name; + this.clusterController = clusterController; + } + + public Action registerRuntime(JobId jobId, EntityId entityId, int partition) { + Action registration = new Action() { + @Override + protected void doExecute(MetadataProvider actorMdProvider) throws Exception { + ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, + new ActivePartitionMessage(new ActiveRuntimeId(entityId, id, partition), jobId, + ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null)); + clusterController.activeEvent(event); + } + }; + add(registration); + return registration; + } + + public Action deRegisterRuntime(JobId jobId, EntityId entityId, int partition) { + Action registration = new Action() { + @Override + protected void doExecute(MetadataProvider actorMdProvider) throws Exception { + ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, + new ActivePartitionMessage(new ActiveRuntimeId(entityId, id, partition), jobId, + ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null)); + clusterController.activeEvent(event); + } + }; + add(registration); + return registration; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java new file mode 100644 index 0000000..5f715af --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.active; + +import java.util.List; +import java.util.concurrent.Semaphore; + +import org.apache.asterix.app.active.ActiveNotificationHandler; +import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.metadata.api.IActiveEntityController; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.utils.DatasetUtil; +import org.apache.asterix.metadata.utils.MetadataLockUtil; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class TestUserActor extends Actor { + + private TestClusterControllerActor clusterController; + private IMetadataLockManager lockManager; + + public TestUserActor(String name, MetadataProvider metadataProvider, TestClusterControllerActor clusterController) { + super(name, metadataProvider); + this.clusterController = clusterController; + this.lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); + } + + public Action startActivity(IActiveEntityController actionListener) { + Action action = new Action() { + @Override + protected void doExecute(MetadataProvider mdProvider) throws Exception { + String dataverseName = actionListener.getEntityId().getDataverse(); + String entityName = actionListener.getEntityId().getEntityName(); + try { + lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName); + List datasets = actionListener.getDatasets(); + for (Dataset dataset : datasets) { + MetadataLockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(), dataverseName, + DatasetUtil.getFullyQualifiedName(dataset)); + } + actionListener.start(mdProvider); + } finally { + mdProvider.getLocks().reset(); + } + } + }; + add(action); + return action; + } + + public Action stopActivity(IActiveEntityController actionListener) { + Action action = new Action() { + @Override + protected void doExecute(MetadataProvider mdProvider) throws Exception { + String dataverseName = actionListener.getEntityId().getDataverse(); + String entityName = actionListener.getEntityId().getEntityName(); + try { + lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName); + List datasets = actionListener.getDatasets(); + for (Dataset dataset : datasets) { + MetadataLockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(), dataverseName, + DatasetUtil.getFullyQualifiedName(dataset)); + } + actionListener.stop(mdProvider); + } finally { + mdProvider.getLocks().reset(); + } + } + }; + add(action); + return action; + } + + public Action suspendActivity(IActiveEntityController actionListener) { + Action action = new Action() { + @Override + protected void doExecute(MetadataProvider mdProvider) throws Exception { + String dataverseName = actionListener.getEntityId().getDataverse(); + String entityName = actionListener.getEntityId().getEntityName(); + List datasets = actionListener.getDatasets(); + try { + lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName); + for (Dataset dataset : datasets) { + lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(), + DatasetUtil.getFullyQualifiedName(dataset)); + } + actionListener.suspend(mdProvider); + } catch (Exception e) { + // only release in case of failure + mdProvider.getLocks().reset(); + throw e; + } + } + }; + add(action); + return action; + } + + public Action resumeActivity(IActiveEntityController actionListener) { + Action action = new Action() { + @Override + protected void doExecute(MetadataProvider mdProvider) throws Exception { + String dataverseName = actionListener.getEntityId().getDataverse(); + String entityName = actionListener.getEntityId().getEntityName(); + try { + lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName); + List datasets = actionListener.getDatasets(); + for (Dataset dataset : datasets) { + lockManager.upgradeDatasetLockToWrite(mdProvider.getLocks(), + DatasetUtil.getFullyQualifiedName(dataset)); + lockManager.downgradeDatasetLockToExclusiveModify(mdProvider.getLocks(), + DatasetUtil.getFullyQualifiedName(dataset)); + } + actionListener.resume(mdProvider); + } finally { + mdProvider.getLocks().reset(); + } + } + }; + add(action); + return action; + } + + public Action addDataset(Dataset dataset, IActiveEntityController actionListener) { + Action action = new Action() { + @Override + protected void doExecute(MetadataProvider mdProvider) throws Exception { + String entityDataverseName = actionListener.getEntityId().getDataverse(); + String entityName = actionListener.getEntityId().getEntityName(); + try { + lockManager.acquireActiveEntityReadLock(mdProvider.getLocks(), + entityDataverseName + '.' + entityName); + lockManager.acquireDatasetWriteLock(mdProvider.getLocks(), + DatasetUtil.getFullyQualifiedName(dataset)); + List datasets = clusterController.getAllDatasets(); + if (datasets.contains(dataset)) { + throw new HyracksDataException("Dataset " + dataset + " already exists"); + } + actionListener.add(dataset); + datasets.add(dataset); + } finally { + mdProvider.getLocks().reset(); + } + } + }; + add(action); + return action; + } + + public Action dropDataset(Dataset dataset, IActiveEntityController actionListener) { + Action action = new Action() { + @Override + protected void doExecute(MetadataProvider mdProvider) throws Exception { + String entityDataverseName = actionListener.getEntityId().getDataverse(); + String entityName = actionListener.getEntityId().getEntityName(); + try { + lockManager.acquireActiveEntityReadLock(mdProvider.getLocks(), + entityDataverseName + '.' + entityName); // we have to first read lock all active entities before deleting a dataset + lockManager.acquireDatasetWriteLock(mdProvider.getLocks(), + DatasetUtil.getFullyQualifiedName(dataset)); + List datasets = clusterController.getAllDatasets(); + if (!datasets.contains(dataset)) { + throw new HyracksDataException("Dataset " + dataset + " does not exist"); + } + actionListener.remove(dataset); + datasets.remove(dataset); + } finally { + mdProvider.getLocks().reset(); + } + } + }; + add(action); + return action; + } + + public Action addIndex(Dataset dataset, IActiveEntityController actionListener) { + Action action = new Action() { + @Override + protected void doExecute(MetadataProvider mdProvider) throws Exception { + String dataverseName = dataset.getDataverseName(); + String datasetFullyQualifiedName = dataverseName + '.' + dataset.getDatasetName(); + String indexFullyQualifiedName = datasetFullyQualifiedName + ".index"; + try { + MetadataLockUtil.createIndexBegin(lockManager, mdProvider.getLocks(), dataverseName, + datasetFullyQualifiedName); + if (actionListener.isActive()) { + throw new RuntimeDataException(ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY, + indexFullyQualifiedName, actionListener.getEntityId(), actionListener.getState()); + } + } finally { + mdProvider.getLocks().reset(); + } + } + }; + add(action); + return action; + } + + public Action dropIndex(Dataset dataset, IActiveEntityController actionListener) { + Action action = new Action() { + @Override + protected void doExecute(MetadataProvider mdProvider) throws Exception { + String dataverseName = dataset.getDataverseName(); + String datasetFullyQualifiedName = dataverseName + '.' + dataset.getDatasetName(); + try { + MetadataLockUtil.dropIndexBegin(lockManager, mdProvider.getLocks(), dataverseName, + datasetFullyQualifiedName); + if (actionListener.isActive()) { + throw new RuntimeDataException( + ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY, + datasetFullyQualifiedName + ".index", actionListener.getEntityId(), + actionListener.getState()); + } + } finally { + mdProvider.getLocks().reset(); + } + } + }; + add(action); + return action; + } + + public Action query(Dataset dataset, Semaphore semaphore) { + Action action = new Action() { + @Override + protected void doExecute(MetadataProvider mdProvider) throws Exception { + String dataverseName = dataset.getDataverseName(); + String datasetFullyQualifiedName = dataverseName + '.' + dataset.getDatasetName(); + try { + lockManager.acquireDataverseReadLock(mdProvider.getLocks(), dataverseName); + lockManager.acquireDatasetReadLock(mdProvider.getLocks(), datasetFullyQualifiedName); + if (!semaphore.tryAcquire()) { + semaphore.acquire(); + } + } finally { + mdProvider.getLocks().reset(); + } + } + }; + add(action); + return action; + } + + public Action suspendAllActivities(ActiveNotificationHandler handler) { + Action action = new Action() { + @Override + protected void doExecute(MetadataProvider mdProvider) throws Exception { + handler.suspend(mdProvider); + } + }; + add(action); + return action; + } + + public Action resumeAllActivities(ActiveNotificationHandler handler) { + Action action = new Action() { + @Override + protected void doExecute(MetadataProvider mdProvider) throws Exception { + try { + handler.resume(mdProvider); + } finally { + mdProvider.getLocks().reset(); + } + } + }; + add(action); + return action; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index 8442162..10b528f 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -472,8 +472,10 @@ public class TestExecutor { } catch (Exception e) { // whoops, not JSON (e.g. 404) - just include the body GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errorBody); - throw new Exception("HTTP operation failed:" + "\nSTATUS LINE: " + httpResponse.getStatusLine() - + "\nERROR_BODY: " + errorBody, e); + Exception failure = new Exception("HTTP operation failed:" + "\nSTATUS LINE: " + + httpResponse.getStatusLine() + "\nERROR_BODY: " + errorBody); + failure.addSuppressed(e); + throw failure; } throw new ParsedException("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + httpResponse.getStatusLine() + "\nSUMMARY: " + errors[2].split("\n")[0], errors[2]); @@ -1173,7 +1175,8 @@ public class TestExecutor { break; } catch (TimeoutException e) { if (responsesReceived == 0) { - throw new Exception("Poll limit (" + timeoutSecs + "s) exceeded without obtaining *any* result from server"); + throw new Exception( + "Poll limit (" + timeoutSecs + "s) exceeded without obtaining *any* result from server"); } else { throw new Exception("Poll limit (" + timeoutSecs + "s) exceeded without obtaining expected result"); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml index a5e6fbb..e99686d 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml @@ -237,13 +237,13 @@ start-started-feed - Feed TweetFeed is started already. + experiments.TweetFeed(Feed) is already started stop-stopped-feed - Feed TweetFeed is not started. + new_experiments.TweetFeed(Feed) cannot be stopped because its state is STOPPED @@ -6421,7 +6421,7 @@ drop-dataverse - org.apache.asterix.metadata.MetadataException: Cannot drop dataverse. Type a.a used by dataset b.b1 + Cannot drop dataverse. Type a.a used by dataset b.b1 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java index c3cf86b..61ded27 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java @@ -55,7 +55,7 @@ public interface IClusterEventsSubscriber { * @param previousState * @param newState */ - default void notifyStateChange(ClusterState previousState, ClusterState newState) { + default void notifyStateChange(ClusterState newState) { // default is no-op } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IMetadataLockManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IMetadataLockManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IMetadataLockManager.java new file mode 100644 index 0000000..4408d84 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IMetadataLockManager.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.metadata.LockList; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; + +public interface IMetadataLockManager { + + /** + * Acquire read lock on the dataverse + * + * @param locks + * the lock list to add the new lock to + * @param dataverseName + * the dataverse name + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireDataverseReadLock(LockList locks, String dataverseName) throws AsterixException; + + /** + * Acquire write lock on the dataverse + * + * @param locks + * the lock list to add the new lock to + * @param dataverseName + * the dataverse name + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireDataverseWriteLock(LockList locks, String dataverseName) throws AsterixException; + + /** + * Acquire read lock on the dataset (for queries) + * + * @param locks + * the lock list to add the new lock to + * @param datasetFullyQualifiedName + * the fully qualified name of the dataset + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireDatasetReadLock(LockList locks, String datasetFullyQualifiedName) throws AsterixException; + + /** + * Acquire write lock on the dataset (for dataset create, dataset drop, and index drop) + * + * @param locks + * the lock list to add the new lock to + * @param datasetFullyQualifiedName + * the fully qualified name of the dataset + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireDatasetWriteLock(LockList locks, String datasetFullyQualifiedName) throws AsterixException; + + /** + * Acquire modify lock on the dataset (for inserts, upserts, deletes) Mutually exclusive with create index lock + * + * @param locks + * the lock list to add the new lock to + * @param datasetFullyQualifiedName + * the fully qualified name of the dataset + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireDatasetModifyLock(LockList locks, String datasetFullyQualifiedName) throws AsterixException; + + /** + * Acquire create index lock on the dataset (for index creation) Mutually exclusive with modify lock + * + * @param locks + * the lock list to add the new lock to + * @param datasetFullyQualifiedName + * the fully qualified name of the dataset + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireDatasetCreateIndexLock(LockList locks, String datasetFullyQualifiedName) throws AsterixException; + + /** + * Acquire exclusive modify lock on the dataset. only a single thread can acquire this lock and it is mutually + * exclusive with modify locks and index build lock + * + * @param locks + * the lock list to add the new lock to + * @param datasetFullyQualifiedName + * the fully qualified name of the dataset + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireDatasetExclusiveModificationLock(LockList locks, String datasetFullyQualifiedName) + throws AsterixException; + + /** + * Acquire read lock on the function + * + * @param locks + * the lock list to add the new lock to + * @param functionFullyQualifiedName + * the fully qualified name of the function + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireFunctionReadLock(LockList locks, String functionFullyQualifiedName) throws AsterixException; + + /** + * Acquire write lock on the function + * + * @param locks + * the lock list to add the new lock to + * @param functionFullyQualifiedName + * the fully qualified name of the function + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireFunctionWriteLock(LockList locks, String functionFullyQualifiedName) throws AsterixException; + + /** + * Acquire read lock on the node group + * + * @param locks + * the lock list to add the new lock to + * @param nodeGroupName + * the name of the node group + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireNodeGroupReadLock(LockList locks, String nodeGroupName) throws AsterixException; + + /** + * Acquire write lock on the node group + * + * @param locks + * the lock list to add the new lock to + * @param nodeGroupName + * the name of the node group + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireNodeGroupWriteLock(LockList locks, String nodeGroupName) throws AsterixException; + + /** + * Acquire read lock on the active entity + * + * @param locks + * the lock list to add the new lock to + * @param entityFullyQualifiedName + * the fully qualified name of the active entity + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireActiveEntityReadLock(LockList locks, String entityFullyQualifiedName) throws AsterixException; + + /** + * Acquire write lock on the active entity + * + * @param locks + * the lock list to add the new lock to + * @param entityFullyQualifiedName + * the fully qualified name of the active entity + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireActiveEntityWriteLock(LockList locks, String entityFullyQualifiedName) throws AsterixException; + + /** + * Acquire read lock on the feed policy + * + * @param locks + * the lock list to add the new lock to + * @param feedPolicyFullyQualifiedName + * the fully qualified name of the feed policy + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireFeedPolicyWriteLock(LockList locks, String feedPolicyFullyQualifiedName) throws AsterixException; + + /** + * Acquire write lock on the feed policy + * + * @param locks + * the lock list to add the new lock to + * @param feedPolicyFullyQualifiedName + * the fully qualified name of the feed policy + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireFeedPolicyReadLock(LockList locks, String feedPolicyFullyQualifiedName) throws AsterixException; + + /** + * Acquire read lock on the merge policy + * + * @param locks + * the lock list to add the new lock to + * @param mergePolicyFullyQualifiedName + * the fully qualified name of the merge policy + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireMergePolicyReadLock(LockList locks, String mergePolicyFullyQualifiedName) throws AsterixException; + + /** + * Acquire write lock on the merge policy + * + * @param locks + * the lock list to add the new lock to + * @param mergePolicyFullyQualifiedName + * the fully qualified name of the merge policy + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireMergePolicyWriteLock(LockList locks, String mergePolicyFullyQualifiedName) throws AsterixException; + + /** + * Acquire read lock on the data type + * + * @param locks + * the lock list to add the new lock to + * @param datatypeFullyQualifiedName + * the fully qualified name of the data type + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireDataTypeReadLock(LockList locks, String datatypeFullyQualifiedName) throws AsterixException; + + /** + * Acquire write lock on the data type + * + * @param locks + * the lock list to add the new lock to + * @param datatypeFullyQualifiedName + * the fully qualified name of the data type + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireDataTypeWriteLock(LockList locks, String datatypeFullyQualifiedName) throws AsterixException; + + /** + * Acquire read lock on the extension entity + * + * @param locks + * the lock list to add the new lock to + * @param extension + * the extension key + * @param extensionEntityFullyQualifiedName + * the fully qualified name of the extension entity + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireExtensionReadLock(LockList locks, String extension, String extensionEntityFullyQualifiedName) + throws AsterixException; + + /** + * Acquire write lock on the extension entity + * + * @param locks + * the lock list to add the new lock to + * @param extension + * the extension key + * @param extensionEntityFullyQualifiedName + * the fully qualified name of the extension entity + * @throws AsterixException + * if lock couldn't be acquired + */ + void acquireExtensionWriteLock(LockList locks, String extension, String extensionEntityFullyQualifiedName) + throws AsterixException; + + /** + * Upgrade a previously acquired exclusive modification lock on the dataset to a write lock + * + * @param locks + * the lock list to add the new lock to + * @param datasetFullyQualifiedName + * the fully qualified name of the dataset + * @throws AlgebricksException + * if lock couldn't be upgraded + */ + void upgradeDatasetLockToWrite(LockList locks, String datasetFullyQualifiedName) throws AlgebricksException; + + /** + * Downgrade an upgraded dataset write lock to an exclusive modification lock + * + * @param locks + * the lock list to add the new lock to + * @param datasetFullyQualifiedName + * the fully qualified name of the dataset + * @throws AlgebricksException + * if lock couldn't be downgraded + */ + void downgradeDatasetLockToExclusiveModify(LockList locks, String datasetFullyQualifiedName) + throws AlgebricksException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java index 6211af4..b4559c8 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java @@ -20,11 +20,22 @@ package org.apache.asterix.common.cluster; import org.apache.asterix.common.api.IClusterEventsSubscriber; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IGlobalRecoveryManager extends IClusterEventsSubscriber { /** - * Starts the global recovery process if the cluster state changed to ACTIVE. + * Starts the global recovery process after the cluster state has changed to ACTIVE. + * + * @param appCtx + * the application context + * @throws HyracksDataException + * if the global recovery fails */ - public void startGlobalRecovery(ICcApplicationContext appCtx); + void startGlobalRecovery(ICcApplicationContext appCtx) throws HyracksDataException; + + /** + * @return true, if global recovery has been completed successfully + */ + boolean isRecoveryCompleted(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java index 249bd56..3eff214 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java @@ -19,6 +19,8 @@ package org.apache.asterix.common.dataflow; import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.replication.IFaultToleranceStrategy; @@ -63,9 +65,9 @@ public interface ICcApplicationContext extends IApplicationContext { IFaultToleranceStrategy getFaultToleranceStrategy(); /** - * @return the active lifecycle listener at Cluster controller + * @return the active notification handler at Cluster controller */ - IJobLifecycleListener getActiveLifecycleListener(); + IJobLifecycleListener getActiveNotificationHandler(); /** * @return a new instance of {@link IHyracksClientConnection} @@ -90,4 +92,14 @@ public interface ICcApplicationContext extends IApplicationContext { * @return the extension manager instance */ Object getExtensionManager(); + + /** + * @return the metadata lock manager + */ + IMetadataLockManager getMetadataLockManager(); + + /** + * @return the cluster state manager + */ + IClusterStateManager getClusterStateManager(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/AsterixException.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/AsterixException.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/AsterixException.java index 7cb7d8a..dc884f1 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/AsterixException.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/AsterixException.java @@ -25,6 +25,10 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; public class AsterixException extends AlgebricksException { private static final long serialVersionUID = 1L; + /** + * @deprecated Instead, use a constructor with error code + */ + @Deprecated public AsterixException(String message) { super(message); } @@ -34,10 +38,11 @@ public class AsterixException extends AlgebricksException { } - public static AsterixException create(int errorCode, Serializable... params) { - return new AsterixException(errorCode, params); - } - + /** + * @deprecated When creating a constructor with cause, + * create AlgebricksException using AlgebricksException.create(Throwable th); + */ + @Deprecated public AsterixException(Throwable cause) { super(cause); } @@ -47,7 +52,15 @@ public class AsterixException extends AlgebricksException { addSuppressed(cause); } + /** + * @deprecated Instead, use a constructor with error code + */ + @Deprecated public AsterixException(String message, Throwable cause) { super(message, cause); } + + public static AsterixException create(int errorCode, Serializable... params) { + return new AsterixException(errorCode, params); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/CompilationException.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/CompilationException.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/CompilationException.java index b53eec4..693b1c0 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/CompilationException.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/CompilationException.java @@ -31,8 +31,7 @@ public class CompilationException extends AlgebricksException { } public CompilationException(int errorCode, Throwable cause, Serializable... params) { - super(ErrorCode.ASTERIX, errorCode, ErrorCode.getErrorMessage(errorCode), params); - addSuppressed(cause); + super(ErrorCode.ASTERIX, errorCode, ErrorCode.getErrorMessage(errorCode), cause, params); } /** http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 9d889ea..a00ed99 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -67,6 +67,7 @@ public class ErrorCode { public static final int POLYGON_INVALID_COORDINATE = 24; public static final int POLYGON_3_POINTS = 25; public static final int POLYGON_INVALID = 26; + public static final int OPERATION_NOT_SUPPORTED = 27; public static final int INSTANTIATION_ERROR = 100; @@ -114,6 +115,12 @@ public class ErrorCode { public static final int INDEX_ILLEGAL_ENFORCED_NON_OPTIONAL = 1041; public static final int INDEX_ILLEGAL_NON_ENFORCED_TYPED = 1042; public static final int INDEX_RTREE_MULTIPLE_FIELDS_NOT_ALLOWED = 1043; + public static final int REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE = 1044; + public static final int ILLEGAL_LOCK_UPGRADE_OPERATION = 1045; + public static final int ILLEGAL_LOCK_DOWNGRADE_OPERATION = 1046; + public static final int UPGRADE_FAILED_LOCK_WAS_NOT_ACQUIRED = 1047; + public static final int DOWNGRADE_FAILED_LOCK_WAS_NOT_ACQUIRED = 1048; + public static final int LOCK_WAS_ACQUIRED_DIFFERENT_OPERATION = 1049; // Feed errors public static final int DATAFLOW_ILLEGAL_STATE = 3001; @@ -200,6 +207,26 @@ public class ErrorCode { public static final int PROVIDER_STREAM_RECORD_READER_WRONG_CONFIGURATION = 3086; public static final int FEED_CONNECT_FEED_APPLIED_INVALID_FUNCTION = 3087; public static final int ACTIVE_MANAGER_INVALID_RUNTIME = 3088; + public static final int ACTIVE_ENTITY_ALREADY_STARTED = 3089; + public static final int ACTIVE_ENTITY_CANNOT_BE_STOPPED = 3090; + public static final int CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY = 3091; + public static final int CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY = 3092; + public static final int ACTIVE_ENTITY_IS_ALREADY_REGISTERED = 3093; + public static final int CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY = 3094; + public static final int CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY = 3095; + public static final int ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED = 3096; + public static final int ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED = 3097; + public static final int CANNOT_DERIGESTER_ACTIVE_ENTITY_LISTENER = 3098; + public static final int DOUBLE_INITIALIZATION_OF_ACTIVE_NOTIFICATION_HANDLER = 3099; + public static final int FAILED_TO_SHUTDOWN_EVENT_PROCESSOR = 3100; + public static final int DOUBLE_RECOVERY_ATTEMPTS = 3101; + public static final int UNREPORTED_TASK_FAILURE_EXCEPTION = 3102; + public static final int ACTIVE_ENTITY_ALREADY_SUSPENDED = 3103; + public static final int ACTIVE_ENTITY_CANNOT_RESUME_FROM_STATE = 3104; + public static final int ACTIVE_RUNTIME_IS_ALREADY_REGISTERED = 3105; + public static final int ACTIVE_RUNTIME_IS_NOT_REGISTERED = 3106; + public static final int ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED = 3107; + public static final int FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD = 3108; // Lifecycle management errors public static final int DUPLICATE_PARTITION_ID = 4000; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/MetadataException.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/MetadataException.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/MetadataException.java new file mode 100644 index 0000000..f04d19d --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/MetadataException.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.common.exceptions; + +import java.io.Serializable; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class MetadataException extends CompilationException { + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(MetadataException.class.getName()); + + @Deprecated + /** + * @Deprecated Instead, use a constructor with error code + * @param message + */ + public MetadataException(String message) { + super(message); + } + + @Deprecated + /** + * @Deprecated When creating a constructor with cause, + * create AlgebricksException using AlgebricksException.create(Throwable th); + * @param cause + */ + public MetadataException(Throwable cause) { + super(cause); + } + + @Deprecated + /** + * @Deprecated When creating a constructor with cause, + * create AlgebricksException using AlgebricksException.create(Throwable th); + * @param cause + */ + public MetadataException(String message, Throwable cause) { + super(message, cause); + } + + public MetadataException(int errorCode, Serializable... params) { + super(errorCode, params); + } + + public MetadataException(int errorCode, Throwable cause, Serializable... params) { + super(errorCode, cause, params); + } + + public static MetadataException create(Throwable cause) { + if (cause instanceof MetadataException || cause == null) { + return (MetadataException) cause; + } + if (cause instanceof InterruptedException && !Thread.currentThread().isInterrupted()) { + LOGGER.log(Level.WARNING, "Wrapping an InterruptedException in " + MetadataException.class.getSimpleName() + + " and current thread is not interrupted", cause); + } + return new MetadataException(cause); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java index 0c099fb..85bfaa5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java @@ -31,7 +31,6 @@ public class RuntimeDataException extends HyracksDataException { } public RuntimeDataException(int errorCode, Throwable cause, Serializable... params) { - super(ErrorCode.ASTERIX, errorCode, ErrorCode.getErrorMessage(errorCode), params); - addSuppressed(cause); + super(ErrorCode.ASTERIX, errorCode, ErrorCode.getErrorMessage(errorCode), cause, params); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java index a0e6e71..276e294 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java @@ -23,6 +23,16 @@ import java.util.List; public interface IDataset { /** + * @return the dataverse name + */ + String getDataverseName(); + + /** + * @return the dataset name + */ + String getDatasetName(); + + /** * @return the list of primary keys for the dataset */ List> getPrimaryKeys(); @@ -31,5 +41,4 @@ public interface IDataset { * @return the bloom filter fields indexes for the primary index of the dataset */ int[] getPrimaryBloomFilterFields(); - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java new file mode 100644 index 0000000..ba17b0c --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.metadata; + +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.MetadataException; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; + +/** + * A Metadata lock local to compilation node + */ +public interface IMetadataLock { + + enum Mode { + READ, + MODIFY, + INDEX_BUILD, + EXCLUSIVE_MODIFY, + UPGRADED_WRITE, + WRITE; + + public boolean contains(Mode mode) { + if (mode == this) { + return true; + } + if (this == Mode.WRITE) { + return true; + } + return mode == Mode.READ; + } + } + + /** + * Acquire a lock + * + * @param mode + * lock mode + */ + void lock(IMetadataLock.Mode mode); + + /** + * Release a lock + * + * @param mode + * lock mode + */ + void unlock(IMetadataLock.Mode mode); + + /** + * Get the lock's key + * + * @return the key identiying the lock + */ + String getKey(); + + /** + * upgrade the lock + * + * @param from + * @param to + * @throws AlgebricksException + */ + default void upgrade(Mode from, Mode to) throws AlgebricksException { + throw new MetadataException(ErrorCode.ILLEGAL_LOCK_UPGRADE_OPERATION, from, to); + } + + /** + * downgrade the lock + * + * @param from + * @param to + * @throws AlgebricksException + */ + default void downgrade(Mode from, Mode to) throws AlgebricksException { + throw new MetadataException(ErrorCode.ILLEGAL_LOCK_DOWNGRADE_OPERATION, from, to); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java new file mode 100644 index 0000000..6f2bc39 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.metadata; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.metadata.IMetadataLock.Mode; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; + +/** + * The LockList is used for two phase locking. + */ +public class LockList { + private final List> locks = new ArrayList<>(); + private final HashMap indexes = new HashMap<>(); + private boolean lockPhase = true; + + /** + * Acquires a lock. + * + * @param mode + * the lock mode. + * @param lock + * the lock object. + */ + public void add(IMetadataLock.Mode mode, IMetadataLock lock) throws AsterixException { + if (isContained(mode, lock)) { + return; + } + lock.lock(mode); + indexes.put(lock.getKey(), locks.size()); + locks.add(MutablePair.of(lock, mode)); + } + + private boolean isContained(Mode mode, IMetadataLock lock) throws AsterixException { + if (!lockPhase) { + throw new AsterixException(ErrorCode.COMPILATION_TWO_PHASE_LOCKING_VIOLATION); + } + Integer index = indexes.get(lock.getKey()); + if (index != null) { + Mode acquired = locks.get(index).right; + if (!acquired.contains(mode)) { + throw new AsterixException(ErrorCode.LOCK_WAS_ACQUIRED_DIFFERENT_OPERATION, mode, acquired); + } + return true; + } + return false; + } + + public void upgrade(Mode to, IMetadataLock lock) throws AlgebricksException { + if (!lockPhase) { + throw new AsterixException(ErrorCode.COMPILATION_TWO_PHASE_LOCKING_VIOLATION); + } + Integer index = indexes.get(lock.getKey()); + if (index == null) { + throw new AsterixException(ErrorCode.UPGRADE_FAILED_LOCK_WAS_NOT_ACQUIRED); + } + MutablePair pair = locks.get(index); + Mode from = pair.getRight(); + if (from == to) { + return; + } + lock.upgrade(from, to); + pair.setRight(to); + } + + public void downgrade(Mode mode, IMetadataLock lock) throws AlgebricksException { + if (!lockPhase) { + throw new AsterixException(ErrorCode.COMPILATION_TWO_PHASE_LOCKING_VIOLATION); + } + Integer index = indexes.get(lock.getKey()); + if (index == null) { + throw new AsterixException(ErrorCode.DOWNGRADE_FAILED_LOCK_WAS_NOT_ACQUIRED); + } + MutablePair pair = locks.get(index); + Mode acquired = pair.getRight(); + lock.downgrade(acquired, mode); + pair.setRight(mode); + } + + /** + * Once unlock() is called, no caller can call add(IMetadataLock.Mode mode, IMetadataLock lock), + * except that reset() is called. + */ + public void unlock() { + for (int i = locks.size() - 1; i >= 0; i--) { + MutablePair pair = locks.get(i); + pair.getLeft().unlock(pair.getRight()); + } + locks.clear(); + indexes.clear(); + lockPhase = false; + } + + /** + * Clears the state and starts another pass of two phase locking again. + */ + public void reset() { + unlock(); + lockPhase = true; + } + + @Override + public String toString() { + return "{\"phase\" : \"" + (lockPhase ? "lock" : "unlock") + "\", \"locks\" : " + locks + "}"; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 9d4db56..9ed6aa5 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -49,7 +49,7 @@ 12 = Invalid implicit scalar to collection coercion in %1$s 14 = Property %1$s not set 15 = Storage metadata directory of %1$s in %2$s already exists -16 = Storage metadata directory of %1$s in %2$s couldn't be created +16 = Storage metadata directory of %1$s in %2$s could not be created 17 = Unknown external file pending operation %1$s 18 = Cannot convert the %1$s type to the %2$s type. 19 = Cannot convert integer types. The source type should be one of %1$s. @@ -60,6 +60,7 @@ 24 = Invalid coordinate 25 = Polygon must have at least 3 points 26 = %1$s can not be an instance of polygon +27 = Operation not supported 100 = Unable to instantiate class %1$s @@ -83,7 +84,7 @@ 1024 = Identifier %1$s is not found in AQL+ meta-scope 1025 = There is no such join type in AQL+ 1026 = The given function expression %1$s cannot utilize index -1027 = Dataset of type %1$s doesn't have a primary index +1027 = Dataset of type %1$s does not have a primary index 1028 = Query parameter %1$s is not supported 1029 = No metadata exists for dataset %1$s 1030 = The subtree does not have any data source @@ -100,6 +101,12 @@ 1041 = Cannot create enforced index on \"%1$s\" field with non-optional type 1042 = Cannot create non-enforced typed index of this kind: %1$s 1043 = Cannot use %1$s fields as key for the R-tree index. There can be only one field as a key for the R-tree index. +1044 = Communication-related exception occurred during the execution of a remote method call +1045 = Illegal attempt to upgrade metadata lock from %1$s to %2$s +1046 = Illegal attempt to downgrade metadata lock from %1$s to %2$s +1047 = Metadata lock cannot be upgraded! because it was not acquired before +1048 = Metadata lock cannot be downgraded! because it was not acquired before +1049 = Metadata lock cannot be acquired for %1$s since it is already acquired for %2$s # Feed Errors 3001 = Illegal state. @@ -111,7 +118,7 @@ 3007 = Twitter4J library not found! 3008 = Unable to ingest data 3009 = Exception in get record type %1$s for feed -3010 = Doesn't support Hive data with list of non-primitive types +3010 = Does not support Hive data with list of non-primitive types 3011 = Cannot get hive type for field of type %1$s 3012 = Failed to get columns of record 3013 = Cannot deserialize Hive records with no closed columns @@ -189,6 +196,26 @@ 3086 = Cannot find record reader %1$s with specified configuration 3087 = Cannot find function %1$s 3088 = %1$s is not a valid runtime Id +3089 = %1$s is already started and has state %2$s +3090 = %1$s cannot be stopped because its state is %2$s +3091 = Cannot add dataset to %1$s because its state is %2$s +3092 = Cannot remove dataset from %1$s because its state is %2$s +3093 = %1$s is already registered +3094 = Cannot create index on dataset %1$s because it is connected to %2$s with state %3$s +3095 = Cannot drop index of dataset %1$s because it is connected to %2$s with state %3$s +3096 = Active Notification Handler is suspended +3097 = Active Entity %1$s has not been registered +3098 = Cannot deregister %1$s because it is active +3099 = Attempt to initialize an initialized Active Notification Handler +3100 = Failed to shutdown event processor for %1$s +3101 = Recovery request while recovery is currently ongoing +3102 = Unreported exception causing task failure +3103 = %1$s is already suspended and has state %2$s +3104 = %1$s cannot be resumed from state %2$s +3105 = %1$s is already registered +3106 = %1$s is not registered +3107 = Active Notification Handler is already suspended +3108 = Feed stopped while waiting for a new record # Lifecycle management errors 4000 = Partition id %1$d for node %2$s already in use by node %3$s http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java index 48df79b..e62672d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java @@ -29,6 +29,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; * to be implemented by each adapter irrespective of the the kind of * adapter(pull or push). */ +@FunctionalInterface public interface IDataSourceAdapter extends Serializable { public enum AdapterType { @@ -38,6 +39,7 @@ public interface IDataSourceAdapter extends Serializable { /** * Triggers the adapter to begin ingesting data from the external source. + * * @param partition * The adapter could be running with a degree of parallelism. * partition corresponds to the i'th parallel instance. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java index bcf5e25..c87fe2d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java @@ -63,4 +63,8 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl public abstract boolean handleException(Throwable th) throws HyracksDataException; public abstract String getStats(); + + public void fail() throws HyracksDataException { + tupleForwarder.fail(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 1e62159..5b9b96f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -21,6 +21,8 @@ package org.apache.asterix.external.dataflow; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.api.IRecordDataParser; import org.apache.asterix.external.api.IRecordReader; @@ -30,6 +32,7 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.log4j.Level; import org.apache.log4j.Logger; public class FeedRecordDataFlowController extends AbstractFeedDataFlowController { @@ -57,8 +60,8 @@ public class FeedRecordDataFlowController extends AbstractFeedDataFlowControl try { failed = false; tupleForwarder.initialize(ctx, writer); - while (recordReader.hasNext()) { - IRawRecord record = recordReader.next(); + while (hasNext()) { + IRawRecord record = next(); if (record == null) { flush(); Thread.sleep(INTERVAL); // NOSONAR: No one notifies the sleeping thread @@ -70,25 +73,46 @@ public class FeedRecordDataFlowController extends AbstractFeedDataFlowControl failedRecordsCount++; } } - } catch (InterruptedException e) { - //TODO: Find out what could cause an interrupted exception beside termination of a job/feed - LOGGER.warn("Feed has been interrupted. Closing the feed", e); - failed = true; - try { - finish(); - } catch (HyracksDataException hde) { - e.addSuppressed(hde); + } catch (HyracksDataException e) { + LOGGER.log(Level.WARN, e); + //if interrupted while waiting for a new record, then it is safe to not fail forward + if (e.getComponent() == ErrorCode.ASTERIX + && e.getErrorCode() == ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD) { + // Do nothing + } else { + failed = true; + throw e; } - throw e; } catch (Exception e) { failed = true; - tupleForwarder.flush(); LOGGER.warn("Failure while operating a feed source", e); throw HyracksDataException.create(e); } finish(); } + private IRawRecord next() throws HyracksDataException { + try { + return recordReader.next(); + } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline + throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + private boolean hasNext() throws HyracksDataException { + boolean hasNext; + try { + hasNext = recordReader.hasNext(); + } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline + throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + return hasNext; + } + private void finish() throws HyracksDataException { HyracksDataException hde = null; try { @@ -194,6 +218,7 @@ public class FeedRecordDataFlowController extends AbstractFeedDataFlowControl return dataParser; } + @Override public String getStats() { return "{\"incoming-records-count\": " + incomingRecordsCount + ", \"failed-at-parser-records-count\": " + failedRecordsCount + "}"; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java index 4177ea6..3a8130b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java @@ -40,6 +40,7 @@ public class FeedTupleForwarder implements ITupleForwarder { private IFrameWriter writer; private boolean paused = false; private boolean initialized; + private boolean failed; public FeedTupleForwarder(FeedLogManager feedLogManager) { this.feedLogManager = feedLogManager; @@ -67,7 +68,8 @@ public class FeedTupleForwarder implements ITupleForwarder { try { wait(); } catch (InterruptedException e) { - throw new HyracksDataException(e); + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); } } } @@ -88,7 +90,7 @@ public class FeedTupleForwarder implements ITupleForwarder { public void close() throws HyracksDataException { Throwable throwable = null; try { - if (appender.getTupleCount() > 0) { + if (!failed && appender.getTupleCount() > 0) { FrameUtils.flushFrame(frame.getBuffer(), writer); } } catch (Throwable th) { @@ -116,4 +118,9 @@ public class FeedTupleForwarder implements ITupleForwarder { public void flush() throws HyracksDataException { appender.flush(writer); } + + public void fail() throws HyracksDataException { + failed = true; + writer.fail(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java index e6d81d3..9f32a25 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java @@ -55,4 +55,8 @@ public class FeedAdapter implements IDataSourceAdapter { public String getStats() { return controller.getStats(); } + + public void fail() throws HyracksDataException { + controller.fail(); + } }