Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 06A5518150 for ; Wed, 23 Dec 2015 02:30:56 +0000 (UTC) Received: (qmail 1151 invoked by uid 500); 23 Dec 2015 02:30:55 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 1116 invoked by uid 500); 23 Dec 2015 02:30:55 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 1107 invoked by uid 99); 23 Dec 2015 02:30:55 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Dec 2015 02:30:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 4532E18055A for ; Wed, 23 Dec 2015 02:30:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.247 X-Spam-Level: * X-Spam-Status: No, score=1.247 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.554, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id PAC1YYPHeGE0 for ; Wed, 23 Dec 2015 02:30:42 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 8A23B23031 for ; Wed, 23 Dec 2015 02:30:42 +0000 (UTC) Received: (qmail 98072 invoked by uid 99); 23 Dec 2015 02:30:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Dec 2015 02:30:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4A8C5E012C; Wed, 23 Dec 2015 02:30:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mhubail@apache.org To: commits@asterixdb.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-asterixdb git commit: Asterix MessageBroker implementation Date: Wed, 23 Dec 2015 02:30:42 +0000 (UTC) Repository: incubator-asterixdb Updated Branches: refs/heads/master 8b41d0bc2 -> fc7272c0c Asterix MessageBroker implementation This change includes the following: - Add implementation for CC/NC MessageBroker. - Implement GlobalResourceIdFactory using MessageBroker. - Change resource id factory to GlobalResourceIdFactory. - Refactor metadata indexes fixed properties. - Use fixed resource ids for metadata indexes. Change-Id: If4320e2c5a0130d2f86a4be6ae61f5cee43e30af Reviewed-on: https://asterix-gerrit.ics.uci.edu/486 Reviewed-by: Yingyi Bu Tested-by: Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/fc7272c0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/fc7272c0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/fc7272c0 Branch: refs/heads/master Commit: fc7272c0cff6d2f5c53278eba4aea889b5ed1c06 Parents: 8b41d0b Author: Murtadha Hubail Authored: Tue Dec 22 17:14:53 2015 -0800 Committer: Murtadha Hubail Committed: Tue Dec 22 18:26:20 2015 -0800 ---------------------------------------------------------------------- .../api/common/AsterixAppRuntimeContext.java | 24 ++-- ...rixAppRuntimeContextProdiverForRecovery.java | 4 +- .../bootstrap/CCApplicationEntryPoint.java | 6 + .../bootstrap/NCApplicationEntryPoint.java | 15 ++- .../asterix/messaging/CCMessageBroker.java | 109 ++++++++++++++++ .../asterix/messaging/NCMessageBroker.java | 94 ++++++++++++++ asterix-common/pom.xml | 10 +- .../common/api/IAsterixAppRuntimeContext.java | 3 +- .../messaging/AbstractApplicationMessage.java | 36 ++++++ .../messaging/ReportMaxResourceIdMessage.java | 37 ++++++ .../ReportMaxResourceIdRequestMessage.java | 37 ++++++ .../messaging/ResourceIdRequestMessage.java | 28 ++++ .../ResourceIdRequestResponseMessage.java | 47 +++++++ .../messaging/api/IApplicationMessage.java | 47 +++++++ .../api/IApplicationMessageCallback.java | 30 +++++ .../common/messaging/api/INCMessageBroker.java | 40 ++++++ .../IAsterixAppRuntimeContextProvider.java | 4 +- .../apache/asterix/metadata/MetadataNode.java | 5 +- .../metadata/bootstrap/MetadataBootstrap.java | 9 +- .../metadata/bootstrap/MetadataIndex.java | 17 +-- .../MetadataIndexImmutableProperties.java | 79 ++++++++++++ .../bootstrap/MetadataPrimaryIndexes.java | 127 +++++++++---------- .../bootstrap/MetadataSecondaryIndexes.java | 27 ++-- .../om/util/AsterixClusterProperties.java | 12 ++ .../resource/GlobalResourceIdFactory.java | 76 +++++++++++ .../GlobalResourceIdFactoryProvider.java | 34 +++++ .../AsterixRuntimeComponentsProvider.java | 4 +- 27 files changed, 838 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java index ce19139..45c0598 100644 --- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java +++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java @@ -49,12 +49,13 @@ import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.feeds.FeedManager; -import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes; +import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties; import org.apache.asterix.om.util.AsterixClusterProperties; import org.apache.asterix.replication.management.ReplicationChannel; import org.apache.asterix.replication.management.ReplicationManager; import org.apache.asterix.replication.recovery.RemoteRecoveryManager; import org.apache.asterix.replication.storage.ReplicaResourcesManager; +import org.apache.asterix.transaction.management.resource.GlobalResourceIdFactoryProvider; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory; import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; @@ -81,8 +82,7 @@ import org.apache.hyracks.storage.common.file.IFileMapManager; import org.apache.hyracks.storage.common.file.IFileMapProvider; import org.apache.hyracks.storage.common.file.ILocalResourceRepository; import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory; -import org.apache.hyracks.storage.common.file.ResourceIdFactory; -import org.apache.hyracks.storage.common.file.ResourceIdFactoryProvider; +import org.apache.hyracks.storage.common.file.IResourceIdFactory; public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider { @@ -118,7 +118,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst private ILSMIOOperationScheduler lsmIOScheduler; private ILocalResourceRepository localResourceRepository; - private ResourceIdFactory resourceIdFactory; + private IResourceIdFactory resourceIdFactory; private IIOManager ioManager; private boolean isShuttingdown; @@ -158,13 +158,14 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst metadataMergePolicyFactory = new PrefixMergePolicyFactory(); + ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory( + ioManager, ncApplicationContext.getNodeId()); + localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository(); + IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery( this); txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider, txnProperties); - ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory( - ioManager, ncApplicationContext.getNodeId()); - localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository(); IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager(); SystemState systemState = recoveryMgr.getSystemState(); @@ -175,7 +176,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst initializeResourceIdFactory(); datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository, - MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager()); + MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager()); isShuttingdown = false; @@ -278,7 +279,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst return localResourceRepository; } - public ResourceIdFactory getResourceIdFactory() { + public IResourceIdFactory getResourceIdFactory() { return resourceIdFactory; } @@ -376,6 +377,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst @Override public void initializeResourceIdFactory() throws HyracksDataException { - resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory(); + resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext) + .createResourceIdFactory(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java index 570c3c9..b975970 100644 --- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java +++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java @@ -32,7 +32,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.file.IFileMapProvider; import org.apache.hyracks.storage.common.file.ILocalResourceRepository; -import org.apache.hyracks.storage.common.file.ResourceIdFactory; +import org.apache.hyracks.storage.common.file.IResourceIdFactory; public class AsterixAppRuntimeContextProdiverForRecovery implements IAsterixAppRuntimeContextProvider { @@ -78,7 +78,7 @@ public class AsterixAppRuntimeContextProdiverForRecovery implements IAsterixAppR } @Override - public ResourceIdFactory getResourceIdFactory() { + public IResourceIdFactory getResourceIdFactory() { return asterixAppRuntimeContext.getResourceIdFactory(); } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java index 80ce5ea..d2164f4 100644 --- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java +++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java @@ -43,6 +43,7 @@ import org.apache.asterix.compiler.provider.SqlppCompilationProvider; import org.apache.asterix.event.service.ILookupService; import org.apache.asterix.feeds.CentralFeedManager; import org.apache.asterix.feeds.FeedLifecycleListener; +import org.apache.asterix.messaging.CCMessageBroker; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.api.IAsterixStateProxy; import org.apache.asterix.metadata.bootstrap.AsterixStateProxy; @@ -55,6 +56,8 @@ import org.apache.hyracks.api.application.ICCApplicationEntryPoint; import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; +import org.apache.hyracks.api.messages.IMessageBroker; +import org.apache.hyracks.control.cc.ClusterControllerService; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -72,9 +75,11 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { private static IAsterixStateProxy proxy; private ICCApplicationContext appCtx; + private IMessageBroker messageBroker; @Override public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception { + messageBroker = new CCMessageBroker((ClusterControllerService)ccAppCtx.getControllerService()); this.appCtx = ccAppCtx; if (LOGGER.isLoggable(Level.INFO)) { @@ -118,6 +123,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { } ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE); + ccAppCtx.setMessageBroker(messageBroker); } @Override http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java index 2cd6a1a..147a356 100644 --- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java +++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java @@ -34,12 +34,13 @@ import org.apache.asterix.common.config.AsterixMetadataProperties; import org.apache.asterix.common.config.AsterixReplicationProperties; import org.apache.asterix.common.config.AsterixTransactionProperties; import org.apache.asterix.common.config.IAsterixPropertiesProvider; +import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.replication.IRemoteRecoveryManager; import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.event.schema.cluster.Cluster; import org.apache.asterix.event.schema.cluster.Node; -import org.apache.asterix.event.schema.cluster.SubstituteNodes; +import org.apache.asterix.messaging.NCMessageBroker; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataNode; import org.apache.asterix.metadata.api.IAsterixStateProxy; @@ -54,6 +55,8 @@ import org.apache.hyracks.api.application.INCApplicationContext; import org.apache.hyracks.api.application.INCApplicationEntryPoint; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; +import org.apache.hyracks.api.messages.IMessageBroker; +import org.apache.hyracks.control.nc.NodeControllerService; import org.kohsuke.args4j.CmdLineException; import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option; @@ -75,6 +78,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint { private SystemState systemState = SystemState.NEW_UNIVERSE; private boolean performedRemoteRecovery = false; private boolean replicationEnabled = false; + private IMessageBroker messageBroker; @Override public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception { @@ -91,6 +95,8 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint { ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getLifeCycleComponentManager())); ncApplicationContext = ncAppCtx; + messageBroker = new NCMessageBroker((NodeControllerService) ncAppCtx.getControllerService()); + ncApplicationContext.setMessageBroker(messageBroker); nodeId = ncApplicationContext.getNodeId(); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Starting Asterix node controller: " + nodeId); @@ -191,6 +197,9 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint { @Override public void notifyStartupComplete() throws Exception { + //send max resource id on this NC to the CC + ((INCMessageBroker) ncApplicationContext.getMessageBroker()).reportMaxResourceId(); + AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext) .getMetadataProperties(); @@ -250,8 +259,8 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Configured:" + lccm); } - ncApplicationContext.setStateDumpHandler(new AsterixStateDumpHandler(ncApplicationContext.getNodeId(), lccm - .getDumpPath(), lccm)); + ncApplicationContext.setStateDumpHandler( + new AsterixStateDumpHandler(ncApplicationContext.getNodeId(), lccm.getDumpPath(), lccm)); lccm.startAll(); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java new file mode 100644 index 0000000..095ef1b --- /dev/null +++ b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.messaging; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +import org.apache.asterix.common.messaging.AbstractApplicationMessage; +import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage; +import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage; +import org.apache.asterix.common.messaging.ResourceIdRequestMessage; +import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage; +import org.apache.asterix.om.util.AsterixClusterProperties; +import org.apache.hyracks.api.messages.IMessage; +import org.apache.hyracks.api.messages.IMessageBroker; +import org.apache.hyracks.api.util.JavaSerializationUtils; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.NodeControllerState; + +public class CCMessageBroker implements IMessageBroker { + + private final static Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName()); + private final AtomicLong globalResourceId = new AtomicLong(0); + private final ClusterControllerService ccs; + private final Set nodesReportedMaxResourceId = new HashSet<>(); + public static final long NO_CALLBACK_MESSAGE_ID = -1; + + public CCMessageBroker(ClusterControllerService ccs) { + this.ccs = ccs; + } + + @Override + public void receivedMessage(IMessage message, String nodeId) throws Exception { + AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message; + switch (absMessage.getMessageType()) { + case RESOURCE_ID_REQUEST: + handleResourceIdRequest(message, nodeId); + break; + case REPORT_MAX_RESOURCE_ID_RESPONSE: + handleReportResourceMaxIdResponse(message, nodeId); + break; + default: + LOGGER.warning("Unknown message: " + absMessage.getMessageType()); + break; + } + } + + private synchronized void handleResourceIdRequest(IMessage message, String nodeId) throws Exception { + ResourceIdRequestMessage msg = (ResourceIdRequestMessage) message; + ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage(); + reponse.setId(msg.getId()); + //cluster is not active + if (!AsterixClusterProperties.isClusterActive()) { + reponse.setResourceId(-1); + reponse.setException(new Exception("Cannot generate global resource id when cluster is not active.")); + } else if (nodesReportedMaxResourceId.size() < AsterixClusterProperties.getNumberOfNodes()) { + //some node has not reported max resource id + reponse.setResourceId(-1); + reponse.setException(new Exception("One or more nodes has not reported max resource id.")); + requestMaxResourceID(); + } else { + reponse.setResourceId(globalResourceId.incrementAndGet()); + } + sendApplicationMessageToNC(reponse, nodeId); + } + + private synchronized void handleReportResourceMaxIdResponse(IMessage message, String nodeId) throws Exception { + ReportMaxResourceIdMessage msg = (ReportMaxResourceIdMessage) message; + globalResourceId.set(Math.max(msg.getMaxResourceId(), globalResourceId.get())); + nodesReportedMaxResourceId.add(nodeId); + } + + private void sendApplicationMessageToNC(IMessage msg, String nodeId) throws Exception { + Map nodeMap = ccs.getNodeMap(); + NodeControllerState state = nodeMap.get(nodeId); + state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId); + } + + private void requestMaxResourceID() throws Exception { + //send request to NCs that have not reported their max resource ids + Set getParticipantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes(); + ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage(); + msg.setId(NO_CALLBACK_MESSAGE_ID); + for (String nodeId : getParticipantNodes) { + if (!nodesReportedMaxResourceId.contains(nodeId)) { + sendApplicationMessageToNC(msg, nodeId); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java new file mode 100644 index 0000000..001771e --- /dev/null +++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.messaging; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.messaging.AbstractApplicationMessage; +import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage; +import org.apache.asterix.common.messaging.api.IApplicationMessage; +import org.apache.asterix.common.messaging.api.IApplicationMessageCallback; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties; +import org.apache.hyracks.api.messages.IMessage; +import org.apache.hyracks.api.util.JavaSerializationUtils; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class NCMessageBroker implements INCMessageBroker { + private final NodeControllerService ncs; + private final AtomicLong messageId = new AtomicLong(0); + private final Map callbacks; + + public NCMessageBroker(NodeControllerService ncs) { + this.ncs = ncs; + callbacks = new ConcurrentHashMap(); + } + + @Override + public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception { + if (callback != null) { + long uniqueMessageId = messageId.incrementAndGet(); + message.setId(uniqueMessageId); + callbacks.put(uniqueMessageId, callback); + } + try { + ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null); + } catch (Exception e) { + if (callback != null) { + //remove the callback in case of failure + callbacks.remove(message.getId()); + } + throw e; + } + } + + @Override + public void receivedMessage(IMessage message, String nodeId) throws Exception { + AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message; + //if the received message is a response to a sent message, deliver it to the sender + IApplicationMessageCallback callback = callbacks.remove(absMessage.getId()); + if (callback != null) { + callback.deliverMessageResponse(absMessage); + } + + //handle requests from CC + switch (absMessage.getMessageType()) { + case REPORT_MAX_RESOURCE_ID_REQUEST: + reportMaxResourceId(); + break; + default: + break; + } + } + + @Override + public void reportMaxResourceId() throws Exception { + IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext() + .getApplicationObject(); + ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(); + //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes. + long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(), + MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID); + maxResourceIdMsg.setMaxResourceId(maxResourceId); + sendMessage(maxResourceIdMsg, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/pom.xml ---------------------------------------------------------------------- diff --git a/asterix-common/pom.xml b/asterix-common/pom.xml index 512502b..02f651f 100644 --- a/asterix-common/pom.xml +++ b/asterix-common/pom.xml @@ -215,7 +215,15 @@ org.apache.hyracks hyracks-storage-am-lsm-rtree - + + org.apache.hyracks + hyracks-control-cc + + + org.apache.hyracks + hyracks-control-nc + + com.fasterxml.jackson.core jackson-core 2.2.0 http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java index 94f5b2f..cd829e7 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java @@ -39,6 +39,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.file.IFileMapProvider; import org.apache.hyracks.storage.common.file.ILocalResourceRepository; +import org.apache.hyracks.storage.common.file.IResourceIdFactory; import org.apache.hyracks.storage.common.file.ResourceIdFactory; public interface IAsterixAppRuntimeContext { @@ -65,7 +66,7 @@ public interface IAsterixAppRuntimeContext { public IDatasetLifecycleManager getDatasetLifecycleManager(); - public ResourceIdFactory getResourceIdFactory(); + public IResourceIdFactory getResourceIdFactory(); public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java new file mode 100644 index 0000000..fbb9b86 --- /dev/null +++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.messaging; + +import org.apache.asterix.common.messaging.api.IApplicationMessage; + +public abstract class AbstractApplicationMessage implements IApplicationMessage { + private static final long serialVersionUID = 1L; + private long id; + + @Override + public void setId(long id) { + this.id = id; + } + + @Override + public long getId() { + return id; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java new file mode 100644 index 0000000..a2b94a7 --- /dev/null +++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.messaging; + +public class ReportMaxResourceIdMessage extends AbstractApplicationMessage { + private static final long serialVersionUID = 1L; + public long maxResourceId; + + @Override + public ApplicationMessageType getMessageType() { + return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_RESPONSE; + } + + public long getMaxResourceId() { + return maxResourceId; + } + + public void setMaxResourceId(long maxResourceId) { + this.maxResourceId = maxResourceId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java new file mode 100644 index 0000000..d2837ce --- /dev/null +++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.messaging; + +public class ReportMaxResourceIdRequestMessage extends AbstractApplicationMessage { + private static final long serialVersionUID = 1L; + public long maxResourceId; + + @Override + public ApplicationMessageType getMessageType() { + return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_REQUEST; + } + + public long getMaxResourceId() { + return maxResourceId; + } + + public void setMaxResourceId(long maxResourceId) { + this.maxResourceId = maxResourceId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java new file mode 100644 index 0000000..daeb9c4 --- /dev/null +++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.messaging; + +public class ResourceIdRequestMessage extends AbstractApplicationMessage { + private static final long serialVersionUID = 1L; + + @Override + public ApplicationMessageType getMessageType() { + return ApplicationMessageType.RESOURCE_ID_REQUEST; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java new file mode 100644 index 0000000..09c50d3 --- /dev/null +++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.messaging; + +public class ResourceIdRequestResponseMessage extends AbstractApplicationMessage { + private static final long serialVersionUID = 1L; + + private long resourceId; + private Exception exception; + + @Override + public ApplicationMessageType getMessageType() { + return ApplicationMessageType.RESOURCE_ID_RESPONSE; + } + + public long getResourceId() { + return resourceId; + } + + public void setResourceId(long resourceId) { + this.resourceId = resourceId; + } + + public Exception getException() { + return exception; + } + + public void setException(Exception exception) { + this.exception = exception; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java new file mode 100644 index 0000000..61ab7cd --- /dev/null +++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.messaging.api; + +import org.apache.hyracks.api.messages.IMessage; + +public interface IApplicationMessage extends IMessage { + + public enum ApplicationMessageType { + RESOURCE_ID_REQUEST, + RESOURCE_ID_RESPONSE, + REPORT_MAX_RESOURCE_ID_REQUEST, + REPORT_MAX_RESOURCE_ID_RESPONSE + } + + public abstract ApplicationMessageType getMessageType(); + + /** + * Sets a unique message id that identifies this message within an NC. + * This id is set by {@link INCMessageBroker#sendMessage(IApplicationMessage, IApplicationMessageCallback)} + * when the callback is not null to notify the sender when the response to that message is received. + * + * @param messageId + */ + public void setId(long messageId); + + /** + * @return The unique message id if it has been set, otherwise 0. + */ + public long getId(); +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java new file mode 100644 index 0000000..3bad5fb --- /dev/null +++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.messaging.api; + +public interface IApplicationMessageCallback { + + /** + * Notifies the message sender when the response has been received. + * + * @param message + * The response message + */ + public void deliverMessageResponse(IApplicationMessage message); +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java new file mode 100644 index 0000000..3ff83b6 --- /dev/null +++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.messaging.api; + +import org.apache.hyracks.api.messages.IMessageBroker; + +public interface INCMessageBroker extends IMessageBroker { + + /** + * Sends application message from this NC to the CC. + * + * @param message + * @param callback + * @throws Exception + */ + public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception; + + /** + * Sends the maximum resource id on this NC to the CC. + * + * @throws Exception + */ + public void reportMaxResourceId() throws Exception; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java index d308564..6382af9 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java @@ -30,7 +30,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.file.IFileMapProvider; import org.apache.hyracks.storage.common.file.ILocalResourceRepository; -import org.apache.hyracks.storage.common.file.ResourceIdFactory; +import org.apache.hyracks.storage.common.file.IResourceIdFactory; public interface IAsterixAppRuntimeContextProvider { @@ -52,7 +52,7 @@ public interface IAsterixAppRuntimeContextProvider { public ILocalResourceRepository getLocalResourceRepository(); - public ResourceIdFactory getResourceIdFactory(); + public IResourceIdFactory getResourceIdFactory(); public IIOManager getIOManager(); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java index 47203cf..c402bef 100644 --- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java +++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java @@ -41,6 +41,7 @@ import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider; import org.apache.asterix.metadata.api.IMetadataIndex; import org.apache.asterix.metadata.api.IMetadataNode; import org.apache.asterix.metadata.api.IValueExtractor; +import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties; import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes; import org.apache.asterix.metadata.bootstrap.MetadataSecondaryIndexes; import org.apache.asterix.metadata.entities.CompactionPolicy; @@ -108,7 +109,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; public class MetadataNode implements IMetadataNode { private static final long serialVersionUID = 1L; - private static final DatasetId METADATA_DATASET_ID = new DatasetId(MetadataPrimaryIndexes.METADATA_DATASET_ID); + private static final DatasetId METADATA_DATASET_ID = new DatasetId(MetadataIndexImmutableProperties.METADATA.getDatasetId()); private IDatasetLifecycleManager datasetLifecycleManager; private ITransactionSubsystem transactionSubsystem; @@ -1087,7 +1088,7 @@ public class MetadataNode implements IMetadataNode { @Override public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException { - int mostRecentDatasetId = MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID; + int mostRecentDatasetId = MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID; try { String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString(); IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index c068657..d76af86 100644 --- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -20,7 +20,6 @@ package org.apache.asterix.metadata.bootstrap; import java.io.File; -import java.rmi.RemoteException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -39,7 +38,6 @@ import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.config.IAsterixPropertiesProvider; import org.apache.asterix.common.context.BaseOperationTracker; -import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; import org.apache.asterix.metadata.IDatasetDetails; import org.apache.asterix.metadata.MetadataException; @@ -73,7 +71,6 @@ import org.apache.asterix.transaction.management.service.transaction.Transaction import org.apache.hyracks.api.application.INCApplicationContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil; @@ -223,7 +220,7 @@ public class MetadataBootstrap { } } - public static void stopUniverse() throws HyracksDataException { + public static void stopUniverse() { // Close all BTree files in BufferCache. // metadata datasets will be closed when the dataset life cycle manger is closed } @@ -404,7 +401,7 @@ public class MetadataBootstrap { LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(), null, null, null, null, true); lsmBtree.create(); - resourceID = runtimeContext.getResourceIdFactory().createId(); + resourceID = index.getResourceID(); ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits, comparatorFactories, bloomFilterKeyFields, index.isPrimaryIndex(), index.getDatasetId().getId(), runtimeContext.getMetadataMergePolicyFactory(), GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, @@ -443,7 +440,7 @@ public class MetadataBootstrap { return metadataNodeName; } - public static void startDDLRecovery() throws RemoteException, ACIDException, MetadataException { + public static void startDDLRecovery() throws MetadataException { //#. clean up any record which has pendingAdd/DelOp flag // as traversing all records from DATAVERSE_DATASET to DATASET_DATASET, and then to INDEX_DATASET. String dataverseName = null; http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java index 3a76db7..9ef9f84 100644 --- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java +++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java @@ -79,9 +79,9 @@ public final class MetadataIndex implements IMetadataIndex { // PrimaryKeyField indexes used for secondary index operations protected final int[] primaryKeyIndexes; - public MetadataIndex(String datasetName, String indexName, int numFields, IAType[] keyTypes, - List> keyNames, int numSecondaryIndexKeys, ARecordType payloadType, int datasetId, - boolean isPrimaryIndex, int[] primaryKeyIndexes) throws AsterixRuntimeException { + public MetadataIndex(MetadataIndexImmutableProperties indexImmutableProperties, int numFields, IAType[] keyTypes, + List> keyNames, int numSecondaryIndexKeys, ARecordType payloadType, boolean isPrimaryIndex, + int[] primaryKeyIndexes) throws AsterixRuntimeException { // Sanity checks. if (keyTypes.length != keyNames.size()) { throw new AsterixRuntimeException("Unequal number of key types and names given."); @@ -90,12 +90,8 @@ public final class MetadataIndex implements IMetadataIndex { throw new AsterixRuntimeException("Number of keys given is greater than total number of fields."); } // Set simple fields. - this.datasetName = datasetName; - if (indexName == null) { - this.indexName = datasetName; - } else { - this.indexName = indexName; - } + this.datasetName = indexImmutableProperties.getDatasetName(); + this.indexName = indexImmutableProperties.getIndexName(); this.keyTypes = keyTypes; this.keyNames = keyNames; this.payloadType = payloadType; @@ -147,11 +143,12 @@ public final class MetadataIndex implements IMetadataIndex { } } - this.datasetId = new DatasetId(datasetId); + this.datasetId = new DatasetId(indexImmutableProperties.getDatasetId()); this.isPrimaryIndex = isPrimaryIndex; //PrimaryKeyFieldIndexes this.primaryKeyIndexes = primaryKeyIndexes; + this.resourceId = indexImmutableProperties.getResourceId(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java new file mode 100644 index 0000000..9b4d0d1 --- /dev/null +++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.bootstrap; + +public enum MetadataIndexImmutableProperties { + METADATA(MetadataConstants.METADATA_DATAVERSE_NAME, 0, 0), + DATAVERSE("Dataverse", 1, 1), + DATASET("Dataset", 2, 2), + DATATYPE("Datatype", 3, 3), + INDEX("Index", 4, 4), + NODE("Node", 5, 5), + NODEGROUP("Nodegroup", 6, 6), + FUNCTION("Function", 7, 7), + DATASOURCE_ADAPTER("DatasourceAdapter", 8, 8), + LIBRARY("Library", 9, 9), + FEED("Feed", 10, 10), + FEED_ACTIVITY_DATASET_ID("FeedActivity", 11, 11), + FEED_POLICY("FeedPolicy", 12, 12), + COMPACTION_POLICY("CompactionPolicy", 13, 13), + EXTERNAL_FILE("ExternalFile", 14, 14), + GROUPNAME_ON_DATASET("GroupName", DATASET, 15), + DATATYPE_NAME_ON_DATASET("DatatypeName", DATASET, 16), + DATATYPE_NAME_ON_DATATYPE("DatatypeName", DATATYPE, 17); + + private final String indexName; + private final int datasetId; + private final long resourceId; + private final MetadataIndexImmutableProperties dataset; + + public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100; + + private MetadataIndexImmutableProperties(String indexName, int datasetId, long resourceId) { + this.indexName = indexName; + this.datasetId = datasetId; + this.resourceId = resourceId; + //a primary index's dataset is itself + this.dataset = this; + } + + private MetadataIndexImmutableProperties(String indexName, MetadataIndexImmutableProperties dataset, + long resourceId) { + this.indexName = indexName; + this.datasetId = dataset.datasetId; + this.resourceId = resourceId; + this.dataset = dataset; + } + + public long getResourceId() { + return resourceId; + } + + public String getIndexName() { + return indexName; + } + + public String getDatasetName() { + return dataset.indexName; + } + + public int getDatasetId() { + return dataset.datasetId; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java index 9bcad8f..258e1c4 100644 --- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java +++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java @@ -46,29 +46,9 @@ public class MetadataPrimaryIndexes { public static IMetadataIndex COMPACTION_POLICY_DATASET; public static IMetadataIndex EXTERNAL_FILE_DATASET; - public static final int METADATA_DATASET_ID = 0; - public static final int DATAVERSE_DATASET_ID = 1; - public static final int DATASET_DATASET_ID = 2; - public static final int DATATYPE_DATASET_ID = 3; - public static final int INDEX_DATASET_ID = 4; - public static final int NODE_DATASET_ID = 5; - public static final int NODEGROUP_DATASET_ID = 6; - public static final int FUNCTION_DATASET_ID = 7; - public static final int DATASOURCE_ADAPTER_DATASET_ID = 8; - - public static final int LIBRARY_DATASET_ID = 9; - public static final int FEED_DATASET_ID = 10; - public static final int FEED_ACTIVITY_DATASET_ID = 11; - public static final int FEED_POLICY_DATASET_ID = 12; - public static final int COMPACTION_POLICY_DATASET_ID = 13; - public static final int EXTERNAL_FILE_DATASET_ID = 14; - - public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100; - /** * Create all metadata primary index descriptors. MetadataRecordTypes must * have been initialized before calling this init. - * * @throws MetadataException * If MetadataRecordTypes have not been initialized. */ @@ -79,63 +59,70 @@ public class MetadataPrimaryIndexes { "Must initialize MetadataRecordTypes before initializing MetadataPrimaryIndexes"); } - DATAVERSE_DATASET = new MetadataIndex("Dataverse", null, 2, new IAType[] { BuiltinType.ASTRING }, - (Arrays.asList(Arrays.asList("DataverseName"))), 0, MetadataRecordTypes.DATAVERSE_RECORDTYPE, - DATAVERSE_DATASET_ID, true, new int[] { 0 }); - - DATASET_DATASET = new MetadataIndex("Dataset", null, 3, - new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList( - Arrays.asList("DataverseName"), Arrays.asList("DatasetName"))), 0, - MetadataRecordTypes.DATASET_RECORDTYPE, DATASET_DATASET_ID, true, new int[] { 0, 1 }); - - DATATYPE_DATASET = new MetadataIndex("Datatype", null, 3, new IAType[] { BuiltinType.ASTRING, - BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatatypeName"))), - 0, MetadataRecordTypes.DATATYPE_RECORDTYPE, DATATYPE_DATASET_ID, true, new int[] { 0, 1 }); - - INDEX_DATASET = new MetadataIndex("Index", null, 4, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, - BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatasetName"), - Arrays.asList("IndexName"))), 0, MetadataRecordTypes.INDEX_RECORDTYPE, INDEX_DATASET_ID, true, - new int[] { 0, 1, 2 }); - - NODE_DATASET = new MetadataIndex("Node", null, 2, new IAType[] { BuiltinType.ASTRING }, (Arrays.asList(Arrays - .asList("NodeName"))), 0, MetadataRecordTypes.NODE_RECORDTYPE, NODE_DATASET_ID, true, new int[] { 0 }); - - NODEGROUP_DATASET = new MetadataIndex("Nodegroup", null, 2, new IAType[] { BuiltinType.ASTRING }, - (Arrays.asList(Arrays.asList("GroupName"))), 0, MetadataRecordTypes.NODEGROUP_RECORDTYPE, - NODEGROUP_DATASET_ID, true, new int[] { 0 }); - - FUNCTION_DATASET = new MetadataIndex("Function", null, 4, new IAType[] { BuiltinType.ASTRING, - BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), - Arrays.asList("Name"), Arrays.asList("Arity"))), 0, MetadataRecordTypes.FUNCTION_RECORDTYPE, - FUNCTION_DATASET_ID, true, new int[] { 0, 1, 2 }); - - DATASOURCE_ADAPTER_DATASET = new MetadataIndex("DatasourceAdapter", null, 3, new IAType[] { - BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), - Arrays.asList("Name"))), 0, MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE, - DATASOURCE_ADAPTER_DATASET_ID, true, new int[] { 0, 1 }); + DATAVERSE_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.DATAVERSE, 2, + new IAType[] { BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"))), 0, + MetadataRecordTypes.DATAVERSE_RECORDTYPE, true, new int[] { 0 }); + + DATASET_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.DATASET, 3, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, + (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatasetName"))), 0, + MetadataRecordTypes.DATASET_RECORDTYPE, true, new int[] { 0, 1 }); + + DATATYPE_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.DATATYPE, 3, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, + (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatatypeName"))), 0, + MetadataRecordTypes.DATATYPE_RECORDTYPE, true, new int[] { 0, 1 }); + + INDEX_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.INDEX, 4, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, + (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatasetName"), + Arrays.asList("IndexName"))), + 0, MetadataRecordTypes.INDEX_RECORDTYPE, true, new int[] { 0, 1, 2 }); + + NODE_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.NODE, 2, new IAType[] { BuiltinType.ASTRING }, + (Arrays.asList(Arrays.asList("NodeName"))), 0, MetadataRecordTypes.NODE_RECORDTYPE, true, + new int[] { 0 }); + + NODEGROUP_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.NODEGROUP, 2, + new IAType[] { BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("GroupName"))), 0, + MetadataRecordTypes.NODEGROUP_RECORDTYPE, true, new int[] { 0 }); + + FUNCTION_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.FUNCTION, 4, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, + (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("Name"), Arrays.asList("Arity"))), 0, + MetadataRecordTypes.FUNCTION_RECORDTYPE, true, new int[] { 0, 1, 2 }); + + DATASOURCE_ADAPTER_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.DATASOURCE_ADAPTER, 3, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, + (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("Name"))), 0, + MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE, true, + new int[] { 0, 1 }); - FEED_DATASET = new MetadataIndex("Feed", null, 3, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, + FEED_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.FEED, 3, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("FeedName"))), 0, - MetadataRecordTypes.FEED_RECORDTYPE, FEED_DATASET_ID, true, new int[] { 0, 1 }); + MetadataRecordTypes.FEED_RECORDTYPE, true, new int[] { 0, 1 }); - LIBRARY_DATASET = new MetadataIndex("Library", null, 3, - new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList( - Arrays.asList("DataverseName"), Arrays.asList("Name"))), 0, - MetadataRecordTypes.LIBRARY_RECORDTYPE, LIBRARY_DATASET_ID, true, new int[] { 0, 1 }); + LIBRARY_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.LIBRARY, 3, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, + (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("Name"))), 0, + MetadataRecordTypes.LIBRARY_RECORDTYPE, true, new int[] { 0, 1 }); - FEED_POLICY_DATASET = new MetadataIndex("FeedPolicy", null, 3, new IAType[] { BuiltinType.ASTRING, - BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("PolicyName"))), 0, - MetadataRecordTypes.FEED_POLICY_RECORDTYPE, FEED_POLICY_DATASET_ID, true, new int[] { 0, 1 }); + FEED_POLICY_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.FEED_POLICY, 3, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, + (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("PolicyName"))), 0, + MetadataRecordTypes.FEED_POLICY_RECORDTYPE, true, new int[] { 0, 1 }); - COMPACTION_POLICY_DATASET = new MetadataIndex("CompactionPolicy", null, 3, new IAType[] { BuiltinType.ASTRING, - BuiltinType.ASTRING }, + COMPACTION_POLICY_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.COMPACTION_POLICY, 3, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("CompactionPolicy"))), 0, - MetadataRecordTypes.COMPACTION_POLICY_RECORDTYPE, COMPACTION_POLICY_DATASET_ID, true, + MetadataRecordTypes.COMPACTION_POLICY_RECORDTYPE, true, new int[] { 0, 1 }); - EXTERNAL_FILE_DATASET = new MetadataIndex("ExternalFile", null, 4, new IAType[] { BuiltinType.ASTRING, - BuiltinType.ASTRING, BuiltinType.AINT32 }, (Arrays.asList(Arrays.asList("DataverseName"), - Arrays.asList("DatasetName"), Arrays.asList("FileNumber"))), 0, - MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE, EXTERNAL_FILE_DATASET_ID, true, new int[] { 0, 1, 2 }); + EXTERNAL_FILE_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.EXTERNAL_FILE, 4, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, + (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatasetName"), + Arrays.asList("FileNumber"))), + 0, MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE, true, new int[] { 0, 1, 2 }); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java index 651021c..fbe339f 100644 --- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java +++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java @@ -37,7 +37,6 @@ public class MetadataSecondaryIndexes { /** * Create all metadata secondary index descriptors. MetadataRecordTypes must * have been initialized before calling this init. - * * @throws MetadataException * If MetadataRecordTypes have not been initialized. */ @@ -48,21 +47,23 @@ public class MetadataSecondaryIndexes { "Must initialize MetadataRecordTypes before initializing MetadataSecondaryIndexes."); } - GROUPNAME_ON_DATASET_INDEX = new MetadataIndex("Dataset", "GroupName", 3, new IAType[] { BuiltinType.ASTRING, - BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("GroupName"), - Arrays.asList("DataverseName"), Arrays.asList("DatasetName"))), 1, null, - MetadataPrimaryIndexes.DATASET_DATASET_ID, false, new int[] { 1, 2 }); + GROUPNAME_ON_DATASET_INDEX = new MetadataIndex(MetadataIndexImmutableProperties.GROUPNAME_ON_DATASET, 3, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, + (Arrays.asList(Arrays.asList("GroupName"), Arrays.asList("DataverseName"), + Arrays.asList("DatasetName"))), + 1, null, false, new int[] { 1, 2 }); - DATATYPENAME_ON_DATASET_INDEX = new MetadataIndex("Dataset", "DatatypeName", 3, new IAType[] { - BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList( - Arrays.asList("DataverseName"), Arrays.asList("DatatypeName"), Arrays.asList("DatasetName"))), 2, null, - MetadataPrimaryIndexes.DATASET_DATASET_ID, false, new int[] { 0, 2 }); + DATATYPENAME_ON_DATASET_INDEX = new MetadataIndex(MetadataIndexImmutableProperties.DATATYPE_NAME_ON_DATASET, 3, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, + (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatatypeName"), + Arrays.asList("DatasetName"))), + 2, null, false, new int[] { 0, 2 }); - DATATYPENAME_ON_DATATYPE_INDEX = new MetadataIndex("Datatype", "DatatypeName", 3, new IAType[] { - BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, + DATATYPENAME_ON_DATATYPE_INDEX = new MetadataIndex(MetadataIndexImmutableProperties.DATATYPE_NAME_ON_DATATYPE, + 3, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("NestedDatatypeName"), - Arrays.asList("TopDatatypeName"))), 2, null, MetadataPrimaryIndexes.DATATYPE_DATASET_ID, false, - new int[] { 0, 2 }); + Arrays.asList("TopDatatypeName"))), + 2, null, false, new int[] { 0, 2 }); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java ---------------------------------------------------------------------- diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java index 438805a..f2482da 100644 --- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java +++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java @@ -185,4 +185,16 @@ public class AsterixClusterProperties { public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) { this.globalRecoveryCompleted = globalRecoveryCompleted; } + + public static boolean isClusterActive() { + if (AsterixClusterProperties.INSTANCE.getCluster() == null) { + //this is a virtual cluster + return true; + } + return AsterixClusterProperties.INSTANCE.getState() == ClusterState.ACTIVE; + } + + public static int getNumberOfNodes(){ + return AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames().size(); + } } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java new file mode 100644 index 0000000..ca7ba51 --- /dev/null +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.resource; + +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.asterix.common.messaging.ResourceIdRequestMessage; +import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage; +import org.apache.asterix.common.messaging.api.IApplicationMessage; +import org.apache.asterix.common.messaging.api.IApplicationMessageCallback; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.hyracks.api.application.IApplicationContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.common.file.IResourceIdFactory; + +/** + * A resource id factory that generates unique resource ids across all NCs by requesting unique ids from the cluster controller. + */ +public class GlobalResourceIdFactory implements IResourceIdFactory, IApplicationMessageCallback { + + private final IApplicationContext appCtx; + private final LinkedBlockingQueue resourceIdResponseQ; + + public GlobalResourceIdFactory(IApplicationContext appCtx) { + this.appCtx = appCtx; + this.resourceIdResponseQ = new LinkedBlockingQueue<>(); + } + + @Override + public long createId() throws HyracksDataException { + try { + ResourceIdRequestResponseMessage reponse = null; + //if there already exists a response, use it + if (resourceIdResponseQ.size() > 0) { + synchronized (resourceIdResponseQ) { + if (resourceIdResponseQ.size() > 0) { + reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take(); + } + } + } + //if no response available or it has an exception, request a new one + if (reponse == null || reponse.getException() != null) { + ResourceIdRequestMessage msg = new ResourceIdRequestMessage(); + ((INCMessageBroker) appCtx.getMessageBroker()).sendMessage(msg, this); + reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take(); + if (reponse.getException() != null) { + throw new HyracksDataException(reponse.getException().getMessage()); + } + } + return reponse.getResourceId(); + } catch (Exception e) { + throw new HyracksDataException(e); + } + } + + @Override + public void deliverMessageResponse(IApplicationMessage message) { + resourceIdResponseQ.offer(message); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java new file mode 100644 index 0000000..ec42139 --- /dev/null +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.resource; + +import org.apache.hyracks.api.application.IApplicationContext; + +public class GlobalResourceIdFactoryProvider { + + private final IApplicationContext appCtx; + + public GlobalResourceIdFactoryProvider(IApplicationContext appCtx) { + this.appCtx = appCtx; + } + + public GlobalResourceIdFactory createResourceIdFactory() { + return new GlobalResourceIdFactory(appCtx); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fc7272c0/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java index 1686e17..01a451c 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java @@ -28,7 +28,7 @@ import org.apache.hyracks.storage.common.IStorageManagerInterface; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.file.IFileMapProvider; import org.apache.hyracks.storage.common.file.ILocalResourceRepository; -import org.apache.hyracks.storage.common.file.ResourceIdFactory; +import org.apache.hyracks.storage.common.file.IResourceIdFactory; public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface, ILSMIOOperationSchedulerProvider { @@ -71,7 +71,7 @@ public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerP } @Override - public ResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) { + public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) { return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) .getResourceIdFactory(); }