Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 37307200B8C for ; Sat, 20 Aug 2016 08:15:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 340F9160AAB; Sat, 20 Aug 2016 06:15:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BCCBD160AC6 for ; Sat, 20 Aug 2016 08:15:44 +0200 (CEST) Received: (qmail 62779 invoked by uid 500); 20 Aug 2016 06:15:44 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 62349 invoked by uid 99); 20 Aug 2016 06:15:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Aug 2016 06:15:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6FC71E97DD; Sat, 20 Aug 2016 06:15:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: buyingyi@apache.org To: commits@asterixdb.apache.org Date: Sat, 20 Aug 2016 06:15:54 -0000 Message-Id: <385b99d3a31848a78514bb35277ef867@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [12/16] asterixdb git commit: Add Asterix Extension Manager archived-at: Sat, 20 Aug 2016 06:15:47 -0000 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java new file mode 100644 index 0000000..a4b2345 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.cc; + +import org.apache.asterix.common.api.IExtension; +import org.apache.asterix.translator.IStatementExecutorFactory; + +/** + * An interface for extensions of {@code IStatementExecutor} + */ +public interface IStatementExecutorExtension extends IExtension { + @Override + default ExtensionKind getExtensionKind() { + return ExtensionKind.STATEMENT_EXECUTOR; + } + + /** + * @return The extension implementation of the {@code IStatementExecutorFactory} + */ + IStatementExecutorFactory getQueryTranslatorFactory(); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java index c44ffc1..fe15ce8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java @@ -26,7 +26,8 @@ import java.util.logging.Logger; import org.apache.asterix.active.ActiveEvent; import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.ActivePartitionMessage; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.job.IJobLifecycleListener; @@ -48,15 +49,17 @@ public class ActiveLifecycleListener implements IJobLifecycleListener { @Override public synchronized void notifyJobStart(JobId jobId) throws HyracksException { - if (ActiveJobNotificationHandler.INSTANCE.isActiveJob(jobId)) { - jobEventInbox.add(new ActiveEvent(jobId, ActiveEvent.EventKind.JOB_START)); + EntityId entityId = ActiveJobNotificationHandler.INSTANCE.getEntity(jobId); + if (entityId != null) { + jobEventInbox.add(new ActiveEvent(jobId, ActiveEvent.EventKind.JOB_START, entityId)); } } @Override public synchronized void notifyJobFinish(JobId jobId) throws HyracksException { - if (ActiveJobNotificationHandler.INSTANCE.isActiveJob(jobId)) { - jobEventInbox.add(new ActiveEvent(jobId, ActiveEvent.EventKind.JOB_FINISH)); + EntityId entityId = ActiveJobNotificationHandler.INSTANCE.getEntity(jobId); + if (entityId != null) { + jobEventInbox.add(new ActiveEvent(jobId, ActiveEvent.EventKind.JOB_FINISH, entityId)); } else { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("NO NEED TO NOTIFY JOB FINISH!"); @@ -72,7 +75,7 @@ public class ActiveLifecycleListener implements IJobLifecycleListener { public void receive(ActivePartitionMessage message) { if (ActiveJobNotificationHandler.INSTANCE.isActiveJob(message.getJobId())) { jobEventInbox.add(new ActiveEvent(message.getJobId(), ActiveEvent.EventKind.PARTITION_EVENT, - message.getFeedId(), message.getPayload())); + message.getActiveRuntimeId().getEntityId(), message)); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java index 92ef062..95fe68c 100755 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java @@ -265,8 +265,8 @@ public class ExternalLibraryUtils { String adapterFactoryClass = adapter.getFactoryClass().trim(); String adapterName = libraryName + "#" + adapter.getName().trim(); AdapterIdentifier aid = new AdapterIdentifier(dataverse, adapterName); - DatasourceAdapter dsa = new DatasourceAdapter(aid, adapterFactoryClass, - IDataSourceAdapter.AdapterType.EXTERNAL); + DatasourceAdapter dsa = + new DatasourceAdapter(aid, adapterFactoryClass, IDataSourceAdapter.AdapterType.EXTERNAL); MetadataManager.INSTANCE.addAdapter(mdTxnCtx, dsa); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Installed adapter: " + adapterName); @@ -334,8 +334,8 @@ public class ExternalLibraryUtils { } // get a reference to the specific library dir - File libDir = new File( - installDir.getAbsolutePath() + File.separator + dataverse + File.separator + libraryName); + File libDir = + new File(installDir.getAbsolutePath() + File.separator + dataverse + File.separator + libraryName); FilenameFilter jarFileFilter = new FilenameFilter() { @Override public boolean accept(File dir, String name) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java index ba15fb1..b815602 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java @@ -22,9 +22,10 @@ import java.io.PrintWriter; import java.util.ArrayList; import java.util.List; -import org.apache.asterix.api.common.SessionConfig; -import org.apache.asterix.api.common.SessionConfig.OutputFormat; -import org.apache.asterix.aql.translator.QueryTranslator; +import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; +import org.apache.asterix.app.translator.QueryTranslator; +import org.apache.asterix.common.app.SessionConfig; +import org.apache.asterix.common.app.SessionConfig.OutputFormat; import org.apache.asterix.compiler.provider.AqlCompilationProvider; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.external.feed.api.IFeedWork; @@ -36,6 +37,7 @@ import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.statement.DataverseDecl; import org.apache.asterix.lang.common.struct.Identifier; import org.apache.asterix.om.util.AsterixAppContextInfo; +import org.apache.asterix.translator.IStatementExecutor; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -68,6 +70,7 @@ public class FeedWorkCollection { private static class SubscribeFeedWorkRunnable implements Runnable { + private static final DefaultStatementExecutorFactory qtFactory = new DefaultStatementExecutorFactory(null); private final FeedConnectionRequest request; private final String[] locations; @@ -82,13 +85,13 @@ public class FeedWorkCollection { //TODO(amoudi): route PrintWriter to log file PrintWriter writer = new PrintWriter(System.err, true); SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM); - DataverseDecl dataverseDecl = - new DataverseDecl(new Identifier(request.getReceivingFeedId().getDataverse())); + DataverseDecl dataverseDecl = new DataverseDecl( + new Identifier(request.getReceivingFeedId().getDataverse())); SubscribeFeedStatement subscribeStmt = new SubscribeFeedStatement(locations, request); List statements = new ArrayList(); statements.add(dataverseDecl); statements.add(subscribeStmt); - QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider); + IStatementExecutor translator = qtFactory.create(statements, pc, compilationProvider); translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, QueryTranslator.ResultDelivery.SYNC); if (LOGGER.isEnabledFor(Level.INFO)) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java new file mode 100644 index 0000000..3ebe873 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc; + +import java.io.IOException; +import java.rmi.RemoteException; +import java.rmi.server.UnicastRemoteObject; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.active.ActiveManager; +import org.apache.asterix.api.common.AsterixAppRuntimeContextProviderForRecovery; +import org.apache.asterix.common.api.AsterixThreadExecutor; +import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.config.AsterixBuildProperties; +import org.apache.asterix.common.config.AsterixCompilerProperties; +import org.apache.asterix.common.config.AsterixExtensionProperties; +import org.apache.asterix.common.config.AsterixExternalProperties; +import org.apache.asterix.common.config.AsterixFeedProperties; +import org.apache.asterix.common.config.AsterixMetadataProperties; +import org.apache.asterix.common.config.AsterixPropertiesAccessor; +import org.apache.asterix.common.config.AsterixReplicationProperties; +import org.apache.asterix.common.config.AsterixStorageProperties; +import org.apache.asterix.common.config.AsterixTransactionProperties; +import org.apache.asterix.common.config.IAsterixPropertiesProvider; +import org.apache.asterix.common.context.AsterixFileMapManager; +import org.apache.asterix.common.context.DatasetLifecycleManager; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.common.replication.IRemoteRecoveryManager; +import org.apache.asterix.common.replication.IReplicaResourcesManager; +import org.apache.asterix.common.replication.IReplicationChannel; +import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider; +import org.apache.asterix.common.transactions.IRecoveryManager; +import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; +import org.apache.asterix.common.transactions.ITransactionSubsystem; +import org.apache.asterix.external.library.ExternalLibraryManager; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataNode; +import org.apache.asterix.metadata.api.IAsterixStateProxy; +import org.apache.asterix.metadata.api.IMetadataNode; +import org.apache.asterix.metadata.bootstrap.MetadataBootstrap; +import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties; +import org.apache.asterix.om.util.AsterixClusterProperties; +import org.apache.asterix.replication.management.ReplicationChannel; +import org.apache.asterix.replication.management.ReplicationManager; +import org.apache.asterix.replication.recovery.RemoteRecoveryManager; +import org.apache.asterix.replication.storage.ReplicaResourcesManager; +import org.apache.asterix.transaction.management.resource.GlobalResourceIdFactoryProvider; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory; +import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; +import org.apache.hyracks.api.application.IApplicationConfig; +import org.apache.hyracks.api.application.INCApplicationContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; +import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; +import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler; +import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory; +import org.apache.hyracks.storage.common.buffercache.BufferCache; +import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy; +import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy; +import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator; +import org.apache.hyracks.storage.common.buffercache.IPageCleanerPolicy; +import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy; +import org.apache.hyracks.storage.common.file.IFileMapManager; +import org.apache.hyracks.storage.common.file.IFileMapProvider; +import org.apache.hyracks.storage.common.file.ILocalResourceRepository; +import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory; +import org.apache.hyracks.storage.common.file.IResourceIdFactory; + +public class AsterixNCAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider { + private static final Logger LOGGER = Logger.getLogger(AsterixNCAppRuntimeContext.class.getName()); + + private ILSMMergePolicyFactory metadataMergePolicyFactory; + private final INCApplicationContext ncApplicationContext; + + private AsterixCompilerProperties compilerProperties; + private AsterixExternalProperties externalProperties; + private AsterixMetadataProperties metadataProperties; + private AsterixStorageProperties storageProperties; + private AsterixTransactionProperties txnProperties; + private AsterixFeedProperties feedProperties; + private AsterixBuildProperties buildProperties; + private AsterixReplicationProperties replicationProperties; + + private AsterixThreadExecutor threadExecutor; + private IDatasetLifecycleManager datasetLifecycleManager; + private IFileMapManager fileMapManager; + private IBufferCache bufferCache; + private ITransactionSubsystem txnSubsystem; + + private ILSMIOOperationScheduler lsmIOScheduler; + private PersistentLocalResourceRepository localResourceRepository; + private IResourceIdFactory resourceIdFactory; + private IIOManager ioManager; + private boolean isShuttingdown; + + private ActiveManager activeManager; + + private IReplicationChannel replicationChannel; + private IReplicationManager replicationManager; + private IRemoteRecoveryManager remoteRecoveryManager; + private IReplicaResourcesManager replicaResourcesManager; + private final int metadataRmiPort; + + private final ILibraryManager libraryManager; + private final NCExtensionManager ncExtensionManager; + + public AsterixNCAppRuntimeContext(INCApplicationContext ncApplicationContext, int metadataRmiPort) + throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException, + IOException { + this.ncApplicationContext = ncApplicationContext; + // Determine whether to use old-style asterix-configuration.xml or new-style configuration. + // QQQ strip this out eventually + AsterixPropertiesAccessor propertiesAccessor; + IApplicationConfig cfg = ncApplicationContext.getAppConfig(); + // QQQ this is NOT a good way to determine whether the config is valid + if (cfg.getString("cc", "cluster.address") != null) { + propertiesAccessor = new AsterixPropertiesAccessor(cfg); + } else { + propertiesAccessor = new AsterixPropertiesAccessor(); + } + compilerProperties = new AsterixCompilerProperties(propertiesAccessor); + externalProperties = new AsterixExternalProperties(propertiesAccessor); + metadataProperties = new AsterixMetadataProperties(propertiesAccessor); + storageProperties = new AsterixStorageProperties(propertiesAccessor); + txnProperties = new AsterixTransactionProperties(propertiesAccessor); + feedProperties = new AsterixFeedProperties(propertiesAccessor); + buildProperties = new AsterixBuildProperties(propertiesAccessor); + replicationProperties = new AsterixReplicationProperties(propertiesAccessor, + AsterixClusterProperties.INSTANCE.getCluster()); + this.metadataRmiPort = metadataRmiPort; + libraryManager = new ExternalLibraryManager(); + ncExtensionManager = new NCExtensionManager( + new AsterixExtensionProperties(propertiesAccessor).getExtensions()); + } + + @Override + public void initialize(boolean initialRun) throws IOException, ACIDException { + Logger.getLogger("org.apache").setLevel(externalProperties.getLogLevel()); + + threadExecutor = new AsterixThreadExecutor(ncApplicationContext.getThreadFactory()); + fileMapManager = new AsterixFileMapManager(); + ICacheMemoryAllocator allocator = new HeapBufferAllocator(); + IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000); + ioManager = ncApplicationContext.getRootContext().getIOManager(); + IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, + storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages()); + + AsynchronousScheduler.INSTANCE.init(ncApplicationContext.getThreadFactory()); + lsmIOScheduler = AsynchronousScheduler.INSTANCE; + + metadataMergePolicyFactory = new PrefixMergePolicyFactory(); + + ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = + new PersistentLocalResourceRepositoryFactory(ioManager, ncApplicationContext.getNodeId(), + metadataProperties); + + localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory + .createRepository(); + + IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = + new AsterixAppRuntimeContextProviderForRecovery(this); + txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider, + txnProperties); + + IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager(); + SystemState systemState = recoveryMgr.getSystemState(); + if (initialRun || systemState == SystemState.NEW_UNIVERSE) { + //delete any storage data before the resource factory is initialized + localResourceRepository.deleteStorageData(true); + } + initializeResourceIdFactory(); + + datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository, + MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager(), + ioManager.getIODevices().size()); + + isShuttingdown = false; + + activeManager = new ActiveManager(ncApplicationContext.getNodeId(), + feedProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize()); + + if (replicationProperties.isReplicationEnabled()) { + String nodeId = ncApplicationContext.getNodeId(); + + replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties); + + replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager, + txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider); + + //pass replication manager to replication required object + //LogManager to replicate logs + txnSubsystem.getLogManager().setReplicationManager(replicationManager); + + //PersistentLocalResourceRepository to replicate metadata files and delete backups on drop index + localResourceRepository.setReplicationManager(replicationManager); + + /** + * add the partitions that will be replicated in this node as inactive partitions + */ + //get nodes which replicate to this node + Set replicationClients = replicationProperties.getNodeReplicationClients(nodeId); + //remove the node itself + replicationClients.remove(nodeId); + for (String clientId : replicationClients) { + //get the partitions of each client + ClusterPartition[] clientPartitions = metadataProperties.getNodePartitions().get(clientId); + for (ClusterPartition partition : clientPartitions) { + localResourceRepository.addInactivePartition(partition.getPartitionId()); + } + } + + //initialize replication channel + replicationChannel = new ReplicationChannel(nodeId, replicationProperties, txnSubsystem.getLogManager(), + replicaResourcesManager, replicationManager, ncApplicationContext, + asterixAppRuntimeContextProvider); + + remoteRecoveryManager = new RemoteRecoveryManager(replicationManager, this, replicationProperties); + + bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager, + storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory(), + replicationManager); + } else { + bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager, + storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory()); + } + + /* + * The order of registration is important. The buffer cache must registered before recovery and transaction + * managers. Notes: registered components are stopped in reversed order + */ + ILifeCycleComponentManager lccm = ncApplicationContext.getLifeCycleComponentManager(); + lccm.register((ILifeCycleComponent) bufferCache); + /** + * LogManager must be stopped after RecoveryManager, DatasetLifeCycleManager, and ReplicationManager + * to process any logs that might be generated during stopping these components + */ + lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager()); + /** + * ReplicationManager must be stopped after indexLifecycleManager and recovery manager + * so that any logs/files generated during closing datasets or checkpoints are sent to remote replicas + */ + if (replicationManager != null) { + lccm.register(replicationManager); + } + lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager()); + /** + * Stopping indexLifecycleManager will flush and close all datasets. + */ + lccm.register((ILifeCycleComponent) datasetLifecycleManager); + lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager()); + lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager()); + } + + @Override + public boolean isShuttingdown() { + return isShuttingdown; + } + + @Override + public void setShuttingdown(boolean isShuttingdown) { + this.isShuttingdown = isShuttingdown; + } + + @Override + public void deinitialize() throws HyracksDataException { + } + + @Override + public IBufferCache getBufferCache() { + return bufferCache; + } + + @Override + public IFileMapProvider getFileMapManager() { + return fileMapManager; + } + + @Override + public ITransactionSubsystem getTransactionSubsystem() { + return txnSubsystem; + } + + @Override + public IDatasetLifecycleManager getDatasetLifecycleManager() { + return datasetLifecycleManager; + } + + @Override + public double getBloomFilterFalsePositiveRate() { + return storageProperties.getBloomFilterFalsePositiveRate(); + } + + @Override + public ILSMIOOperationScheduler getLSMIOScheduler() { + return lsmIOScheduler; + } + + @Override + public ILocalResourceRepository getLocalResourceRepository() { + return localResourceRepository; + } + + @Override + public IResourceIdFactory getResourceIdFactory() { + return resourceIdFactory; + } + + @Override + public IIOManager getIOManager() { + return ioManager; + } + + @Override + public AsterixStorageProperties getStorageProperties() { + return storageProperties; + } + + @Override + public AsterixTransactionProperties getTransactionProperties() { + return txnProperties; + } + + @Override + public AsterixCompilerProperties getCompilerProperties() { + return compilerProperties; + } + + @Override + public AsterixMetadataProperties getMetadataProperties() { + return metadataProperties; + } + + @Override + public AsterixExternalProperties getExternalProperties() { + return externalProperties; + } + + @Override + public AsterixFeedProperties getFeedProperties() { + return feedProperties; + } + + @Override + public AsterixBuildProperties getBuildProperties() { + return buildProperties; + } + + @Override + public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) { + return datasetLifecycleManager.getOperationTracker(datasetID); + } + + @Override + public AsterixThreadExecutor getThreadExecutor() { + return threadExecutor; + } + + @Override + public ILSMMergePolicyFactory getMetadataMergePolicyFactory() { + return metadataMergePolicyFactory; + } + + @Override + public ActiveManager getActiveManager() { + return activeManager; + } + + @Override + public AsterixReplicationProperties getReplicationProperties() { + return replicationProperties; + } + + @Override + public IReplicationChannel getReplicationChannel() { + return replicationChannel; + } + + @Override + public IReplicaResourcesManager getReplicaResourcesManager() { + return replicaResourcesManager; + } + + @Override + public IRemoteRecoveryManager getRemoteRecoveryManager() { + return remoteRecoveryManager; + } + + @Override + public IReplicationManager getReplicationManager() { + return replicationManager; + } + + @Override + public ILibraryManager getLibraryManager() { + return libraryManager; + } + + @Override + public void initializeResourceIdFactory() throws HyracksDataException { + resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory(); + } + + @Override + public void initializeMetadata(boolean newUniverse) throws Exception { + IAsterixStateProxy proxy; + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Bootstrapping metadata"); + } + MetadataNode.INSTANCE.initialize(this, ncExtensionManager.getMetadataTupleTranslatorProvider(), + ncExtensionManager.getMetadataExtensions()); + + proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState(); + if (proxy == null) { + throw new IllegalStateException("Metadata node cannot access distributed state"); + } + + // This is a special case, we just give the metadataNode directly. + // This way we can delay the registration of the metadataNode until + // it is completely initialized. + MetadataManager.instantiate(new MetadataManager(proxy, MetadataNode.INSTANCE)); + MetadataBootstrap.startUniverse(this, ncApplicationContext, newUniverse); + MetadataBootstrap.startDDLRecovery(); + ncExtensionManager.initializeMetadata(); + + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Metadata node bound"); + } + } + + @Override + public void exportMetadataNodeStub() throws RemoteException { + IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort); + ((IAsterixStateProxy) ncApplicationContext.getDistributedState()).setMetadataNode(stub); + } + + @Override + public void unexportMetadataNodeStub() throws RemoteException { + UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false); + } + + public NCExtensionManager getNcExtensionManager() { + return ncExtensionManager; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java new file mode 100644 index 0000000..759bf09 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc; + +import java.rmi.RemoteException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.app.cc.CompilerExtensionManager; +import org.apache.asterix.common.api.IExtension; +import org.apache.asterix.common.config.AsterixExtension; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.metadata.api.IMetadataExtension; +import org.apache.asterix.metadata.entitytupletranslators.MetadataTupleTranslatorProvider; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * AsterixDB's implementation of {@code INCExtensionManager} which takes care of + * initializing extensions on Node Controllers + */ +public class NCExtensionManager { + + private final MetadataTupleTranslatorProvider tupleTranslatorProvider; + private final List mdExtensions; + + /** + * Initialize {@code CCExtensionManager} from configuration + * + * @param list + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + * @throws HyracksDataException + */ + public NCExtensionManager(List list) + throws InstantiationException, IllegalAccessException, ClassNotFoundException, HyracksDataException { + MetadataTupleTranslatorProvider ttp = null; + IMetadataExtension tupleTranslatorExtension = null; + mdExtensions = new ArrayList<>(); + if (list != null) { + for (AsterixExtension extensionConf : list) { + IExtension extension = (IExtension) Class.forName(extensionConf.getClassName()).newInstance(); + extension.configure(extensionConf.getArgs()); + switch (extension.getExtensionKind()) { + case METADATA: + IMetadataExtension mde = (IMetadataExtension) extension; + mdExtensions.add(mde); + ttp = extendTupleTranslator(ttp, tupleTranslatorExtension, mde); + tupleTranslatorExtension = ttp == null ? null : mde; + break; + default: + break; + } + } + } + this.tupleTranslatorProvider = ttp == null ? new MetadataTupleTranslatorProvider() : ttp; + } + + private MetadataTupleTranslatorProvider extendTupleTranslator(MetadataTupleTranslatorProvider ttp, + IMetadataExtension tupleTranslatorExtension, IMetadataExtension mde) throws HyracksDataException { + if (ttp != null) { + throw new HyracksDataException(ErrorCode.ASTERIX, ErrorCode.ERROR_EXTENSION_CONFLICT, + CompilerExtensionManager.ERROR_MESSAGE_COMPONENT_CONFLICT, tupleTranslatorExtension.getId(), + mde.getId(), IMetadataExtension.class.getSimpleName()); + } + return mde.getMetadataTupleTranslatorProvider(); + } + + public List getMetadataExtensions() { + return mdExtensions; + } + + public MetadataTupleTranslatorProvider getMetadataTupleTranslatorProvider() { + return tupleTranslatorProvider; + } + + /** + * Called on bootstrap of metadata node allowing extensions to instantiate their Metadata artifacts + * + * @throws HyracksDataException + */ + public void initializeMetadata() throws HyracksDataException { + if (mdExtensions != null) { + for (IMetadataExtension mdExtension : mdExtensions) { + try { + mdExtension.initializeMetadata(); + } catch (RemoteException | ACIDException e) { + throw new HyracksDataException(e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java new file mode 100644 index 0000000..9b1b32b --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.result; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.IOException; +import java.io.StringWriter; +import java.nio.ByteBuffer; + +import org.apache.asterix.common.app.SessionConfig; +import org.apache.asterix.common.utils.JSONUtil; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutor.Stats; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.nc.resources.memory.FrameManager; + +public class ResultPrinter { + + // TODO(tillw): Should this be static? + private static FrameManager resultDisplayFrameMgr = new FrameManager(ResultReader.FRAME_SIZE); + + private final SessionConfig conf; + private final Stats stats; + private final ARecordType recordType; + + private boolean indentJSON; + private boolean quoteRecord; + + // Whether we are wrapping the output sequence in an array + private boolean wrapArray = false; + // Whether this is the first instance being output + private boolean notFirst = false; + + public ResultPrinter(SessionConfig conf, Stats stats, ARecordType recordType) { + this.conf = conf; + this.stats = stats; + this.recordType = recordType; + this.indentJSON = conf.is(SessionConfig.FORMAT_INDENT_JSON); + this.quoteRecord = conf.is(SessionConfig.FORMAT_QUOTE_RECORD); + } + + private static void appendCSVHeader(Appendable app, ARecordType recordType) throws HyracksDataException { + try { + String[] fieldNames = recordType.getFieldNames(); + boolean notfirst = false; + for (String name : fieldNames) { + if (notfirst) { + app.append(','); + } + notfirst = true; + app.append('"').append(name.replace("\"", "\"\"")).append('"'); + } + app.append("\r\n"); + } catch (IOException e) { + throw new HyracksDataException(e); + } + } + + private void printPrefix() throws HyracksDataException { + // If we're outputting CSV with a header, the HTML header was already + // output by displayCSVHeader(), so skip it here + if (conf.is(SessionConfig.FORMAT_HTML)) { + conf.out().println("

Results:

"); + conf.out().println("
");
+        }
+
+        try {
+            conf.resultPrefix(new AlgebricksAppendable(conf.out()));
+        } catch (AlgebricksException e) {
+            throw new HyracksDataException(e);
+        }
+
+        if (conf.is(SessionConfig.FORMAT_WRAPPER_ARRAY)) {
+            conf.out().print("[ ");
+            wrapArray = true;
+        }
+
+        if (conf.fmt() == SessionConfig.OutputFormat.CSV && conf.is(SessionConfig.FORMAT_CSV_HEADER)) {
+            if (recordType == null) {
+                throw new HyracksDataException("Cannot print CSV with header without specifying output-record-type");
+            }
+            if (quoteRecord) {
+                StringWriter sw = new StringWriter();
+                appendCSVHeader(sw, recordType);
+                conf.out().print(JSONUtil.quoteAndEscape(sw.toString()));
+                conf.out().print("\n");
+                notFirst = true;
+            } else {
+                appendCSVHeader(conf.out(), recordType);
+            }
+        }
+    }
+
+    private void printPostfix() throws HyracksDataException {
+        conf.out().flush();
+        if (wrapArray) {
+            conf.out().println(" ]");
+        }
+        try {
+            conf.resultPostfix(new AlgebricksAppendable(conf.out()));
+        } catch (AlgebricksException e) {
+            throw new HyracksDataException(e);
+        }
+        if (conf.is(SessionConfig.FORMAT_HTML)) {
+            conf.out().println("
"); + } + } + + private void displayRecord(String result) { + String record = result; + if (indentJSON) { + // TODO(tillw): this is inefficient - do this during record generation + record = JSONUtil.indent(record, 2); + } + if (conf.fmt() == SessionConfig.OutputFormat.CSV) { + // TODO(tillw): this is inefficient as well + record = record + "\r\n"; + } + if (quoteRecord) { + // TODO(tillw): this is inefficient as well + record = JSONUtil.quoteAndEscape(record); + } + conf.out().print(record); + stats.setCount(stats.getCount() + 1); + // TODO(tillw) fix this approximation + stats.setSize(stats.getSize() + record.length()); + } + + public void print(String record) throws HyracksDataException { + printPrefix(); + // TODO(tillw) evil hack + quoteRecord = true; + displayRecord(record); + printPostfix(); + } + + public void print(ResultReader resultReader) throws HyracksDataException { + printPrefix(); + + final IFrameTupleAccessor fta = resultReader.getFrameTupleAccessor(); + final IFrame frame = new VSizeFrame(resultDisplayFrameMgr); + + while (resultReader.read(frame) > 0) { + final ByteBuffer frameBuffer = frame.getBuffer(); + final byte[] frameBytes = frameBuffer.array(); + fta.reset(frameBuffer); + final int last = fta.getTupleCount(); + for (int tIndex = 0; tIndex < last; tIndex++) { + final int start = fta.getTupleStartOffset(tIndex); + int length = fta.getTupleEndOffset(tIndex) - start; + if (conf.fmt() == SessionConfig.OutputFormat.CSV + && ((length > 0) && (frameBytes[start + length - 1] == '\n'))) { + length--; + } + String result = new String(frameBytes, start, length, UTF_8); + if (wrapArray && notFirst) { + conf.out().print(", "); + } + notFirst = true; + displayRecord(result); + } + frameBuffer.clear(); + } + + printPostfix(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java new file mode 100644 index 0000000..99235be --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.result; + +import org.apache.asterix.om.util.AsterixAppContextInfo; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.dataset.DatasetJobRecord.Status; +import org.apache.hyracks.api.dataset.IHyracksDataset; +import org.apache.hyracks.api.dataset.IHyracksDatasetReader; +import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor; + +public class ResultReader { + private final IHyracksDataset hyracksDataset; + + private IHyracksDatasetReader reader; + + private IFrameTupleAccessor frameTupleAccessor; + + // Number of parallel result reader buffers + public static final int NUM_READERS = 1; + + public static final int FRAME_SIZE = AsterixAppContextInfo.getInstance().getCompilerProperties().getFrameSize(); + + public ResultReader(IHyracksDataset hdc) { + hyracksDataset = hdc; + } + + public void open(JobId jobId, ResultSetId resultSetId) throws HyracksDataException { + reader = hyracksDataset.createReader(jobId, resultSetId); + frameTupleAccessor = new ResultFrameTupleAccessor(); + } + + public Status getStatus() { + return reader.getResultStatus(); + } + + public int read(IFrame frame) throws HyracksDataException { + return reader.read(frame); + } + + public IFrameTupleAccessor getFrameTupleAccessor() { + return frameTupleAccessor; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java new file mode 100644 index 0000000..b5fd96e --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.result; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.asterix.api.http.servlet.APIServlet; +import org.apache.asterix.common.app.SessionConfig; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.translator.IStatementExecutor.Stats; +import org.apache.http.ParseException; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.log4j.Logger; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +public class ResultUtil { + private static final Logger LOGGER = Logger.getLogger(ResultUtil.class.getName()); + public static final Map HTML_ENTITIES = Collections.unmodifiableMap(Stream.of( + new AbstractMap.SimpleImmutableEntry<>('"', """), new AbstractMap.SimpleImmutableEntry<>('&', "&"), + new AbstractMap.SimpleImmutableEntry<>('<', "<"), new AbstractMap.SimpleImmutableEntry<>('>', ">")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + + private ResultUtil() { + + } + + /** + * escapes html entities in aString + * + * @param aString + * @return escaped String + */ + public static String escapeHTML(String aString) { + String escaped = aString; + for (Entry entry : HTML_ENTITIES.entrySet()) { + if (escaped.indexOf(entry.getKey()) >= 0) { + escaped = escaped.replace(entry.getKey().toString(), entry.getValue()); + } + } + return escaped; + } + + public static void displayResults(ResultReader resultReader, SessionConfig conf, Stats stats, + ARecordType recordType) throws HyracksDataException { + new ResultPrinter(conf, stats, recordType).print(resultReader); + } + + public static void displayResults(String record, SessionConfig conf, Stats stats, ARecordType recordType) + throws HyracksDataException { + new ResultPrinter(conf, stats, recordType).print(record); + } + + public static JSONObject getErrorResponse(int errorCode, String errorMessage, String errorSummary, + String errorStackTrace) { + JSONObject errorResp = new JSONObject(); + JSONArray errorArray = new JSONArray(); + errorArray.put(errorCode); + errorArray.put(errorMessage); + try { + errorResp.put("error-code", errorArray); + if (!"".equals(errorSummary)) { + errorResp.put("summary", errorSummary); + } else { + //parse exception + errorResp.put("summary", errorMessage); + } + errorResp.put("stacktrace", errorStackTrace); + } catch (JSONException e) { + LOGGER.warn("Failed to build the result's JSON object", e); + // TODO(madhusudancs): Figure out what to do when JSONException occurs while building the results. + } + return errorResp; + } + + public static void webUIErrorHandler(PrintWriter out, Exception e) { + String errorTemplate = readTemplateFile("/webui/errortemplate.html", "%s\n%s\n%s"); + + String errorOutput = String.format(errorTemplate, escapeHTML(extractErrorMessage(e)), + escapeHTML(extractErrorSummary(e)), escapeHTML(extractFullStackTrace(e))); + out.println(errorOutput); + } + + public static void webUIParseExceptionHandler(PrintWriter out, Throwable e, String query) { + String errorTemplate = readTemplateFile("/webui/errortemplate_message.html", "
%s\n
"); + + String errorOutput = String.format(errorTemplate, buildParseExceptionMessage(e, query)); + out.println(errorOutput); + } + + public static void apiErrorHandler(PrintWriter out, Exception e) { + int errorCode = 99; + if (e instanceof ParseException) { + errorCode = 2; + } else if (e instanceof AlgebricksException) { + errorCode = 3; + } else if (e instanceof HyracksDataException) { + errorCode = 4; + } + + JSONObject errorResp = ResultUtil.getErrorResponse(errorCode, extractErrorMessage(e), extractErrorSummary(e), + extractFullStackTrace(e)); + out.write(errorResp.toString()); + } + + public static String buildParseExceptionMessage(Throwable e, String query) { + StringBuilder errorMessage = new StringBuilder(); + String message = e.getMessage(); + message = message.replace("<", "<"); + message = message.replace(">", ">"); + errorMessage.append("Error: " + message + "\n"); + int pos = message.indexOf("line"); + if (pos > 0) { + Pattern p = Pattern.compile("\\d+"); + Matcher m = p.matcher(message); + if (m.find(pos)) { + int lineNo = Integer.parseInt(message.substring(m.start(), m.end())); + String[] lines = query.split("\n"); + if (lineNo > lines.length) { + errorMessage.append("===> <BLANK LINE> \n"); + } else { + String line = lines[lineNo - 1]; + errorMessage.append("==> " + line); + } + } + } + return errorMessage.toString(); + } + + private static Throwable getRootCause(Throwable cause) { + Throwable currentCause = cause; + Throwable nextCause = cause.getCause(); + while (nextCause != null && nextCause != currentCause) { + currentCause = nextCause; + nextCause = cause.getCause(); + } + return currentCause; + } + + /** + * Extract the message in the root cause of the stack trace: + * + * @param e + * @return error message string. + */ + private static String extractErrorMessage(Throwable e) { + Throwable cause = getRootCause(e); + String fullyQualifiedExceptionClassName = cause.getClass().getName(); + String[] hierarchySplits = fullyQualifiedExceptionClassName.split("\\."); + //try returning the class without package qualification + String exceptionClassName = hierarchySplits[hierarchySplits.length - 1]; + String localizedMessage = cause.getLocalizedMessage(); + if (localizedMessage == null) { + localizedMessage = "Internal error. Please check instance logs for further details."; + } + return localizedMessage + " [" + exceptionClassName + "]"; + } + + /** + * Extract the meaningful part of a stack trace: + * a. the causes in the stack trace hierarchy + * b. the top exception for each cause + * + * @param e + * @return the contacted message containing a and b. + */ + private static String extractErrorSummary(Throwable e) { + StringBuilder errorMessageBuilder = new StringBuilder(); + Throwable cause = e; + errorMessageBuilder.append(cause.getLocalizedMessage()); + while (cause != null) { + StackTraceElement[] stackTraceElements = cause.getStackTrace(); + errorMessageBuilder.append(stackTraceElements.length > 0 ? "\n caused by: " + stackTraceElements[0] : ""); + cause = cause.getCause(); + } + return errorMessageBuilder.toString(); + } + + /** + * Extract the full stack trace: + * + * @param e + * @return the string containing the full stack trace of the error. + */ + public static String extractFullStackTrace(Throwable e) { + StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter); + e.printStackTrace(printWriter); + return stringWriter.toString(); + } + + /** + * Read the template file which is stored as a resource and return its content. If the file does not exist or is + * not readable return the default template string. + * + * @param path + * The path to the resource template file + * @param defaultTemplate + * The default template string if the template file does not exist or is not readable + * @return The template string to be used to render the output. + */ + //TODO(till|amoudi|mblow|yingyi|ceej|imaxon): path is ignored completely!! + private static String readTemplateFile(String path, String defaultTemplate) { + String errorTemplate = defaultTemplate; + try { + String resourcePath = "/webui/errortemplate_message.html"; + InputStream is = APIServlet.class.getResourceAsStream(resourcePath); + InputStreamReader isr = new InputStreamReader(is); + StringBuilder sb = new StringBuilder(); + BufferedReader br = new BufferedReader(isr); + String line = br.readLine(); + + while (line != null) { + sb.append(line); + line = br.readLine(); + } + errorTemplate = sb.toString(); + } catch (IOException ioe) { + LOGGER.warn("Unable to read template error message file", ioe); + } + return errorTemplate; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java new file mode 100644 index 0000000..c802d08 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.translator; + +import java.util.List; + +import org.apache.asterix.app.cc.CompilerExtensionManager; +import org.apache.asterix.common.app.SessionConfig; +import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.translator.IStatementExecutorFactory; + +public class DefaultStatementExecutorFactory implements IStatementExecutorFactory { + + protected final CompilerExtensionManager cExtensionManager; + + public DefaultStatementExecutorFactory(CompilerExtensionManager cExtensionManager) { + this.cExtensionManager = cExtensionManager; + } + + @Override + public QueryTranslator create(List aqlStatements, SessionConfig conf, + ILangCompilationProvider compilationProvider) { + return new QueryTranslator(aqlStatements, conf, compilationProvider, cExtensionManager); + } + +}