asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [1/3] asterixdb git commit: Ensure Metadata locks are acquired for SQL++ queries
Date Sun, 02 Apr 2017 21:49:24 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master ff915a9ec -> 6eb0175f9


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
deleted file mode 100644
index 2e35ed4..0000000
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.utils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-
-/**
- * This is a singelton class used to maintain the version of each external dataset with indexes
- * It should be consolidated once a better global dataset lock management is introduced.
- *
- * @author alamouda
- */
-public class ExternalDatasetsRegistry {
-    public static final ExternalDatasetsRegistry INSTANCE = new ExternalDatasetsRegistry();
-    private final ConcurrentHashMap<String, ExternalDatasetAccessManager> globalRegister;
-
-    private ExternalDatasetsRegistry() {
-        globalRegister = new ConcurrentHashMap<>();
-    }
-
-    /**
-     * Get the current version of the dataset
-     *
-     * @param dataset
-     * @return
-     */
-    public int getDatasetVersion(Dataset dataset) {
-        String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
-        ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key);
-        if (datasetAccessMgr == null) {
-            globalRegister.putIfAbsent(key, new ExternalDatasetAccessManager());
-            datasetAccessMgr = globalRegister.get(key);
-        }
-        return datasetAccessMgr.getVersion();
-    }
-
-    public int getAndLockDatasetVersion(Dataset dataset, MetadataProvider metadataProvider)
{
-
-        Map<String, Integer> locks;
-        String lockKey = dataset.getDataverseName() + "." + dataset.getDatasetName();
-        // check first if the lock was aquired already
-        locks = metadataProvider.getLocks();
-        if (locks == null) {
-            locks = new HashMap<>();
-            metadataProvider.setLocks(locks);
-        } else {
-            // if dataset was accessed already by this job, return the registered version
-            Integer version = locks.get(lockKey);
-            if (version != null) {
-                return version;
-            }
-        }
-
-        ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(lockKey);
-        if (datasetAccessMgr == null) {
-            globalRegister.putIfAbsent(lockKey, new ExternalDatasetAccessManager());
-            datasetAccessMgr = globalRegister.get(lockKey);
-        }
-
-        // aquire the correct lock
-        int version = datasetAccessMgr.queryBegin();
-        locks.put(lockKey, version);
-        return version;
-    }
-
-    public void refreshBegin(Dataset dataset) {
-        String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
-        ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key);
-        if (datasetAccessMgr == null) {
-            datasetAccessMgr = globalRegister.put(key, new ExternalDatasetAccessManager());
-        }
-        // aquire the correct lock
-        datasetAccessMgr.refreshBegin();
-    }
-
-    public void removeDatasetInfo(Dataset dataset) {
-        String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
-        globalRegister.remove(key);
-    }
-
-    public void refreshEnd(Dataset dataset, boolean success) {
-        String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
-        globalRegister.get(key).refreshEnd(success);
-    }
-
-    public void buildIndexBegin(Dataset dataset, boolean firstIndex) {
-        String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
-        ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key);
-        if (datasetAccessMgr == null) {
-            globalRegister.putIfAbsent(key, new ExternalDatasetAccessManager());
-            datasetAccessMgr = globalRegister.get(key);
-        }
-        // aquire the correct lock
-        datasetAccessMgr.buildIndexBegin(firstIndex);
-    }
-
-    public void buildIndexEnd(Dataset dataset, boolean firstIndex) {
-        String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
-        globalRegister.get(key).buildIndexEnd(firstIndex);
-    }
-
-    public void releaseAcquiredLocks(MetadataProvider metadataProvider) {
-        Map<String, Integer> locks = metadataProvider.getLocks();
-        if (locks == null) {
-            return;
-        } else {
-            // if dataset was accessed already by this job, return the registered version
-            Set<Entry<String, Integer>> aquiredLocks = locks.entrySet();
-            for (Entry<String, Integer> entry : aquiredLocks) {
-                ExternalDatasetAccessManager accessManager = globalRegister.get(entry.getKey());
-                if (accessManager != null) {
-                    accessManager.queryEnd(entry.getValue());
-                }
-            }
-            locks.clear();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
deleted file mode 100644
index 9292008..0000000
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
+++ /dev/null
@@ -1,587 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.utils;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.FeedConnection;
-
-public class MetadataLockManager {
-
-    public static final MetadataLockManager INSTANCE = new MetadataLockManager();
-    private static final Function<String, ReentrantReadWriteLock> LOCK_FUNCTION = key
-> new ReentrantReadWriteLock();
-    private static final Function<String, DatasetLock> DATASET_LOCK_FUNCTION = key
-> new DatasetLock();
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataversesLocks;
-    private final ConcurrentHashMap<String, DatasetLock> datasetsLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> nodeGroupsLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedsLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedPolicyLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> extensionLocks;
-
-    private MetadataLockManager() {
-        dataversesLocks = new ConcurrentHashMap<>();
-        datasetsLocks = new ConcurrentHashMap<>();
-        functionsLocks = new ConcurrentHashMap<>();
-        nodeGroupsLocks = new ConcurrentHashMap<>();
-        feedsLocks = new ConcurrentHashMap<>();
-        feedPolicyLocks = new ConcurrentHashMap<>();
-        compactionPolicyLocks = new ConcurrentHashMap<>();
-        dataTypeLocks = new ConcurrentHashMap<>();
-        extensionLocks = new ConcurrentHashMap<>();
-    }
-
-    public void acquireDataverseReadLock(String dataverseName) {
-        ReentrantReadWriteLock dvLock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
-        dvLock.readLock().lock();
-    }
-
-    public void releaseDataverseReadLock(String dataverseName) {
-        dataversesLocks.get(dataverseName).readLock().unlock();
-    }
-
-    public void acquireDataverseWriteLock(String dataverseName) {
-        ReentrantReadWriteLock dvLock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
-        dvLock.writeLock().lock();
-    }
-
-    public void releaseDataverseWriteLock(String dataverseName) {
-        dataversesLocks.get(dataverseName).writeLock().unlock();
-    }
-
-    public void acquireDatasetReadLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
-        dsLock.acquireReadLock();
-    }
-
-    public void releaseDatasetReadLock(String datasetName) {
-        datasetsLocks.get(datasetName).releaseReadLock();
-    }
-
-    public void acquireDatasetWriteLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
-        dsLock.acquireWriteLock();
-    }
-
-    public void releaseDatasetWriteLock(String datasetName) {
-        datasetsLocks.get(datasetName).releaseWriteLock();
-    }
-
-    public void acquireDatasetModifyLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
-        dsLock.acquireReadLock();
-        dsLock.acquireReadModifyLock();
-    }
-
-    public void releaseDatasetModifyLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.get(datasetName);
-        dsLock.releaseReadModifyLock();
-        dsLock.releaseReadLock();
-    }
-
-    public void acquireDatasetCreateIndexLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
-        dsLock.acquireReadLock();
-        dsLock.acquireWriteModifyLock();
-    }
-
-    public void releaseDatasetCreateIndexLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.get(datasetName);
-        dsLock.releaseWriteModifyLock();
-        dsLock.releaseReadLock();
-    }
-
-    public void acquireExternalDatasetRefreshLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
-        dsLock.acquireReadLock();
-        dsLock.acquireRefreshLock();
-    }
-
-    public void releaseExternalDatasetRefreshLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.get(datasetName);
-        dsLock.releaseRefreshLock();
-        dsLock.releaseReadLock();
-    }
-
-    public void acquireFunctionReadLock(String functionName) {
-        ReentrantReadWriteLock fLock = functionsLocks.computeIfAbsent(functionName, LOCK_FUNCTION);
-        fLock.readLock().lock();
-    }
-
-    public void releaseFunctionReadLock(String functionName) {
-        functionsLocks.get(functionName).readLock().unlock();
-    }
-
-    public void acquireFunctionWriteLock(String functionName) {
-        ReentrantReadWriteLock fLock = functionsLocks.computeIfAbsent(functionName, LOCK_FUNCTION);
-        fLock.writeLock().lock();
-    }
-
-    public void releaseFunctionWriteLock(String functionName) {
-        functionsLocks.get(functionName).writeLock().unlock();
-    }
-
-    public void acquireNodeGroupReadLock(String nodeGroupName) {
-        ReentrantReadWriteLock ngLock = nodeGroupsLocks.computeIfAbsent(nodeGroupName, LOCK_FUNCTION);
-        ngLock.readLock().lock();
-    }
-
-    public void releaseNodeGroupReadLock(String nodeGroupName) {
-        nodeGroupsLocks.get(nodeGroupName).readLock().unlock();
-    }
-
-    public void acquireNodeGroupWriteLock(String nodeGroupName) {
-        ReentrantReadWriteLock ngLock = nodeGroupsLocks.computeIfAbsent(nodeGroupName, LOCK_FUNCTION);
-        ngLock.writeLock().lock();
-    }
-
-    public void releaseNodeGroupWriteLock(String nodeGroupName) {
-        nodeGroupsLocks.get(nodeGroupName).writeLock().unlock();
-    }
-
-    public void acquireFeedReadLock(String feedName) {
-        ReentrantReadWriteLock fLock = feedsLocks.computeIfAbsent(feedName, LOCK_FUNCTION);
-        fLock.readLock().lock();
-    }
-
-    public void releaseFeedReadLock(String feedName) {
-        feedsLocks.get(feedName).readLock().unlock();
-    }
-
-    public void acquireFeedWriteLock(String feedName) {
-        ReentrantReadWriteLock fLock = feedsLocks.computeIfAbsent(feedName, LOCK_FUNCTION);
-        fLock.writeLock().lock();
-    }
-
-    public void releaseFeedWriteLock(String feedName) {
-        feedsLocks.get(feedName).writeLock().unlock();
-    }
-
-    public void acquireFeedPolicyWriteLock(String policyName) {
-        ReentrantReadWriteLock fLock = feedPolicyLocks.computeIfAbsent(policyName, LOCK_FUNCTION);
-        fLock.writeLock().lock();
-    }
-
-    public void releaseFeedPolicyWriteLock(String policyName) {
-        feedPolicyLocks.get(policyName).writeLock().unlock();
-    }
-
-    public void acquireCompactionPolicyReadLock(String compactionPolicyName) {
-        ReentrantReadWriteLock compactionPolicyLock =
-                compactionPolicyLocks.computeIfAbsent(compactionPolicyName, LOCK_FUNCTION);
-        compactionPolicyLock.readLock().lock();
-    }
-
-    public void releaseCompactionPolicyReadLock(String compactionPolicyName) {
-        compactionPolicyLocks.get(compactionPolicyName).readLock().unlock();
-    }
-
-    public void acquireCompactionPolicyWriteLock(String compactionPolicyName) {
-        ReentrantReadWriteLock compactionPolicyLock =
-                compactionPolicyLocks.computeIfAbsent(compactionPolicyName, LOCK_FUNCTION);
-        compactionPolicyLock.writeLock().lock();
-    }
-
-    public void releaseCompactionPolicyWriteLock(String compactionPolicyName) {
-        compactionPolicyLocks.get(compactionPolicyName).writeLock().unlock();
-    }
-
-    public void acquireDataTypeReadLock(String dataTypeName) {
-        ReentrantReadWriteLock dataTypeLock = dataTypeLocks.computeIfAbsent(dataTypeName,
LOCK_FUNCTION);
-        dataTypeLock.readLock().lock();
-    }
-
-    public void releaseDataTypeReadLock(String dataTypeName) {
-        dataTypeLocks.get(dataTypeName).readLock().unlock();
-    }
-
-    public void acquireDataTypeWriteLock(String dataTypeName) {
-        ReentrantReadWriteLock dataTypeLock = dataTypeLocks.computeIfAbsent(dataTypeName,
LOCK_FUNCTION);
-        dataTypeLock.writeLock().lock();
-    }
-
-    public void releaseDataTypeWriteLock(String dataTypeName) {
-        dataTypeLocks.get(dataTypeName).writeLock().unlock();
-    }
-
-    public void createDatasetBegin(String dataverseName, String itemTypeDataverseName,
-            String itemTypeFullyQualifiedName, String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName,
-            String nodeGroupName, String compactionPolicyName, String datasetFullyQualifiedName,
-            boolean isDefaultCompactionPolicy) {
-        acquireDataverseReadLock(dataverseName);
-        if (!dataverseName.equals(itemTypeDataverseName)) {
-            acquireDataverseReadLock(itemTypeDataverseName);
-        }
-        if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName)
-                && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) {
-            acquireDataverseReadLock(metaItemTypeDataverseName);
-        }
-        acquireDataTypeReadLock(itemTypeFullyQualifiedName);
-        if (metaItemTypeFullyQualifiedName != null
-                && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName))
{
-            acquireDataTypeReadLock(metaItemTypeFullyQualifiedName);
-        }
-        acquireNodeGroupReadLock(nodeGroupName);
-        if (!isDefaultCompactionPolicy) {
-            acquireCompactionPolicyReadLock(compactionPolicyName);
-        }
-        acquireDatasetWriteLock(datasetFullyQualifiedName);
-    }
-
-    public void createDatasetEnd(String dataverseName, String itemTypeDataverseName, String
itemTypeFullyQualifiedName,
-            String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName, String
nodeGroupName,
-            String compactionPolicyName, String datasetFullyQualifiedName, boolean isDefaultCompactionPolicy)
{
-        releaseDatasetWriteLock(datasetFullyQualifiedName);
-        if (!isDefaultCompactionPolicy) {
-            releaseCompactionPolicyReadLock(compactionPolicyName);
-        }
-        releaseNodeGroupReadLock(nodeGroupName);
-        if (metaItemTypeFullyQualifiedName != null
-                && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName))
{
-            releaseDataTypeReadLock(metaItemTypeFullyQualifiedName);
-        }
-        releaseDataTypeReadLock(itemTypeFullyQualifiedName);
-        if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName)
-                && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) {
-            releaseDataverseReadLock(metaItemTypeDataverseName);
-        }
-        if (!dataverseName.equals(itemTypeDataverseName)) {
-            releaseDataverseReadLock(itemTypeDataverseName);
-        }
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void createIndexBegin(String dataverseName, String datasetFullyQualifiedName)
{
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetCreateIndexLock(datasetFullyQualifiedName);
-    }
-
-    public void createIndexEnd(String dataverseName, String datasetFullyQualifiedName) {
-        releaseDatasetCreateIndexLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void createTypeBegin(String dataverseName, String itemTypeFullyQualifiedName)
{
-        acquireDataverseReadLock(dataverseName);
-        acquireDataTypeWriteLock(itemTypeFullyQualifiedName);
-    }
-
-    public void createTypeEnd(String dataverseName, String itemTypeFullyQualifiedName) {
-        releaseDataTypeWriteLock(itemTypeFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void dropDatasetBegin(String dataverseName, String datasetFullyQualifiedName)
{
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetWriteLock(datasetFullyQualifiedName);
-    }
-
-    public void dropDatasetEnd(String dataverseName, String datasetFullyQualifiedName) {
-        releaseDatasetWriteLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void dropIndexBegin(String dataverseName, String datasetFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetWriteLock(datasetFullyQualifiedName);
-    }
-
-    public void dropIndexEnd(String dataverseName, String datasetFullyQualifiedName) {
-        releaseDatasetWriteLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void dropTypeBegin(String dataverseName, String dataTypeFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDataTypeWriteLock(dataTypeFullyQualifiedName);
-    }
-
-    public void dropTypeEnd(String dataverseName, String dataTypeFullyQualifiedName) {
-        releaseDataTypeWriteLock(dataTypeFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void functionStatementBegin(String dataverseName, String functionFullyQualifiedName)
{
-        acquireDataverseReadLock(dataverseName);
-        acquireFunctionWriteLock(functionFullyQualifiedName);
-    }
-
-    public void functionStatementEnd(String dataverseName, String functionFullyQualifiedName)
{
-        releaseFunctionWriteLock(functionFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void modifyDatasetBegin(String dataverseName, String datasetFullyQualifiedName)
{
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetModifyLock(datasetFullyQualifiedName);
-    }
-
-    public void modifyDatasetEnd(String dataverseName, String datasetFullyQualifiedName)
{
-        releaseDatasetModifyLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName,
List<String> dataverses,
-            List<String> datasets) {
-        dataverses.add(dataverseName);
-        datasets.add(datasetFullyQualifiedName);
-        Collections.sort(dataverses);
-        Collections.sort(datasets);
-
-        String previous = null;
-        for (int i = 0; i < dataverses.size(); i++) {
-            String current = dataverses.get(i);
-            if (!current.equals(previous)) {
-                acquireDataverseReadLock(current);
-                previous = current;
-            }
-        }
-
-        for (int i = 0; i < datasets.size(); i++) {
-            String current = datasets.get(i);
-            if (!current.equals(previous)) {
-                if (current.equals(datasetFullyQualifiedName)) {
-                    acquireDatasetModifyLock(current);
-                } else {
-                    acquireDatasetReadLock(current);
-                }
-                previous = current;
-            }
-        }
-    }
-
-    public void insertDeleteUpsertEnd(String dataverseName, String datasetFullyQualifiedName,
List<String> dataverses,
-            List<String> datasets) {
-        String previous = null;
-        for (int i = dataverses.size() - 1; i >= 0; i--) {
-            String current = dataverses.get(i);
-            if (!current.equals(previous)) {
-                releaseDataverseReadLock(current);
-                previous = current;
-            }
-        }
-        for (int i = datasets.size() - 1; i >= 0; i--) {
-            String current = datasets.get(i);
-            if (!current.equals(previous)) {
-                if (current.equals(datasetFullyQualifiedName)) {
-                    releaseDatasetModifyLock(current);
-                } else {
-                    releaseDatasetReadLock(current);
-                }
-                previous = current;
-            }
-        }
-    }
-
-    public void dropFeedBegin(String dataverseName, String feedFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireFeedWriteLock(feedFullyQualifiedName);
-    }
-
-    public void dropFeedEnd(String dataverseName, String feedFullyQualifiedName) {
-        releaseFeedWriteLock(feedFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void dropFeedPolicyBegin(String dataverseName, String policyName) {
-        acquireFeedWriteLock(policyName);
-        acquireDataverseReadLock(dataverseName);
-    }
-
-    public void dropFeedPolicyEnd(String dataverseName, String policyName) {
-        releaseFeedWriteLock(policyName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void startFeedBegin(String dataverseName, String feedName, List<FeedConnection>
feedConnections) {
-        acquireDataverseReadLock(dataverseName);
-        acquireFeedReadLock(feedName);
-        for (FeedConnection feedConnection : feedConnections) {
-            acquireDatasetReadLock(dataverseName + "." + feedConnection.getDatasetName());
-        }
-    }
-
-    public void startFeedEnd(String dataverseName, String feedName, List<FeedConnection>
feedConnections) {
-        releaseDataverseReadLock(dataverseName);
-        releaseFeedReadLock(feedName);
-        for (FeedConnection feedConnection : feedConnections) {
-            releaseDatasetReadLock(dataverseName + "." + feedConnection.getDatasetName());
-        }
-    }
-
-    public void StopFeedBegin(String dataverseName, String feedName) {
-        // TODO: dataset lock?
-        acquireDataverseReadLock(dataverseName);
-        acquireFeedReadLock(feedName);
-    }
-
-    public void StopFeedEnd(String dataverseName, String feedName) {
-        releaseDataverseReadLock(dataverseName);
-        releaseFeedReadLock(feedName);
-    }
-
-    public void createFeedBegin(String dataverseName, String feedFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireFeedWriteLock(feedFullyQualifiedName);
-    }
-
-    public void createFeedEnd(String dataverseName, String feedFullyQualifiedName) {
-        releaseFeedWriteLock(feedFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void connectFeedBegin(String dataverseName, String datasetFullyQualifiedName,
-            String feedFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetReadLock(datasetFullyQualifiedName);
-        acquireFeedReadLock(feedFullyQualifiedName);
-    }
-
-    public void connectFeedEnd(String dataverseName, String datasetFullyQualifiedName, String
feedFullyQualifiedName) {
-        releaseFeedReadLock(feedFullyQualifiedName);
-        releaseDatasetReadLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void createFeedPolicyBegin(String dataverseName, String policyName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireFeedPolicyWriteLock(policyName);
-    }
-
-    public void createFeedPolicyEnd(String dataverseName, String policyName) {
-        releaseFeedPolicyWriteLock(policyName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void disconnectFeedBegin(String dataverseName, String datasetFullyQualifiedName,
-            String feedFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetReadLock(datasetFullyQualifiedName);
-        acquireFeedReadLock(feedFullyQualifiedName);
-    }
-
-    public void disconnectFeedEnd(String dataverseName, String datasetFullyQualifiedName,
-            String feedFullyQualifiedName) {
-        releaseFeedReadLock(feedFullyQualifiedName);
-        releaseDatasetReadLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void subscribeFeedBegin(String dataverseName, String datasetFullyQualifiedName,
-            String feedFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetReadLock(datasetFullyQualifiedName);
-        acquireFeedReadLock(feedFullyQualifiedName);
-    }
-
-    public void subscribeFeedEnd(String dataverseName, String datasetFullyQualifiedName,
-            String feedFullyQualifiedName) {
-        releaseFeedReadLock(feedFullyQualifiedName);
-        releaseDatasetReadLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void compactBegin(String dataverseName, String datasetFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetReadLock(datasetFullyQualifiedName);
-    }
-
-    public void compactEnd(String dataverseName, String datasetFullyQualifiedName) {
-        releaseDatasetReadLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void queryBegin(Dataverse dataverse, List<String> dataverses, List<String>
datasets) {
-        if (dataverse != null) {
-            dataverses.add(dataverse.getDataverseName());
-        }
-        Collections.sort(dataverses);
-        Collections.sort(datasets);
-
-        String previous = null;
-        for (int i = 0; i < dataverses.size(); i++) {
-            String current = dataverses.get(i);
-            if (!current.equals(previous)) {
-                acquireDataverseReadLock(current);
-                previous = current;
-            }
-        }
-
-        for (int i = 0; i < datasets.size(); i++) {
-            String current = datasets.get(i);
-            if (!current.equals(previous)) {
-                acquireDatasetReadLock(current);
-                previous = current;
-            }
-        }
-    }
-
-    public void queryEnd(List<String> dataverses, List<String> datasets) {
-        String previous = null;
-        for (int i = dataverses.size() - 1; i >= 0; i--) {
-            String current = dataverses.get(i);
-            if (!current.equals(previous)) {
-                releaseDataverseReadLock(current);
-                previous = current;
-            }
-        }
-        for (int i = datasets.size() - 1; i >= 0; i--) {
-            String current = datasets.get(i);
-            if (!current.equals(previous)) {
-                releaseDatasetReadLock(current);
-                previous = current;
-            }
-        }
-    }
-
-    public void refreshDatasetBegin(String dataverseName, String datasetFullyQualifiedName)
{
-        acquireDataverseReadLock(dataverseName);
-        acquireExternalDatasetRefreshLock(datasetFullyQualifiedName);
-    }
-
-    public void refreshDatasetEnd(String dataverseName, String datasetFullyQualifiedName)
{
-        releaseExternalDatasetRefreshLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void acquireExtensionReadLock(String entityName) {
-        ReentrantReadWriteLock entityLock = extensionLocks.computeIfAbsent(entityName, LOCK_FUNCTION);
-        entityLock.readLock().lock();
-    }
-
-    public void releaseExtensionReadLock(String entityName) {
-        extensionLocks.get(entityName).readLock().unlock();
-    }
-
-    public void acquireExtensionWriteLock(String entityName) {
-        ReentrantReadWriteLock entityLock = extensionLocks.computeIfAbsent(entityName, LOCK_FUNCTION);
-        entityLock.writeLock().lock();
-    }
-
-    public void releaseExtensionWriteLock(String entityName) {
-        extensionLocks.get(entityName).writeLock().unlock();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
index 8859b9d..45034de 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
@@ -32,6 +32,7 @@ import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index 35e7acb..a4b849f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -48,19 +48,19 @@ public class SplitsAndConstraintsUtil {
         ClusterPartition[] clusterPartition = ClusterStateManager.INSTANCE.getClusterPartitons();
         String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName();
         for (int j = 0; j < clusterPartition.length; j++) {
-                File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
-                    clusterPartition[j].getPartitionId()) + File.separator + relPathFile);
-                splits.add(StoragePathUtil.getFileSplitForClusterPartition(clusterPartition[j],
f.getPath()));
+            File f = new File(
+                    StoragePathUtil.prepareStoragePartitionPath(storageDirName, clusterPartition[j].getPartitionId())
+                            + File.separator + relPathFile);
+            splits.add(StoragePathUtil.getFileSplitForClusterPartition(clusterPartition[j],
f.getPath()));
         }
         return splits.toArray(new FileSplit[] {});
     }
 
-    public static FileSplit[] getDatasetSplits(MetadataTransactionContext mdTxnCtx, String
dataverseName,
-            String datasetName, String targetIdxName, boolean temp) throws AlgebricksException
{
+    public static FileSplit[] getDatasetSplits(Dataset dataset, MetadataTransactionContext
mdTxnCtx,
+            String targetIdxName, boolean temp) throws AlgebricksException {
         try {
-            File relPathFile =
-                    new File(StoragePathUtil.prepareDataverseIndexName(dataverseName, datasetName,
targetIdxName));
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName,
datasetName);
+            File relPathFile = new File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
+                    dataset.getDatasetName(), targetIdxName));
             List<String> nodeGroup =
                     MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames();
             if (nodeGroup == null) {
@@ -92,12 +92,11 @@ public class SplitsAndConstraintsUtil {
         }
     }
 
-    private static FileSplit[] getFilesIndexSplits(MetadataTransactionContext mdTxnCtx, String
dataverseName,
-            String datasetName, String targetIdxName, boolean create) throws AlgebricksException
{
+    private static FileSplit[] getFilesIndexSplits(Dataset dataset, MetadataTransactionContext
mdTxnCtx,
+            String targetIdxName, boolean create) throws AlgebricksException {
         try {
-            File relPathFile =
-                    new File(StoragePathUtil.prepareDataverseIndexName(dataverseName, datasetName,
targetIdxName));
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName,
datasetName);
+            File relPathFile = new File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
+                    dataset.getDatasetName(), targetIdxName));
             List<String> nodeGroup =
                     MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames();
             if (nodeGroup == null) {
@@ -114,14 +113,14 @@ public class SplitsAndConstraintsUtil {
                     // Only the first partition when create
                     File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
                             nodePartitions[firstPartition].getPartitionId()) + File.separator
+ relPathFile);
-                    splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition],
f
-                            .getPath()));
+                    splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition],
+                            f.getPath()));
                 } else {
                     for (int k = 0; k < nodePartitions.length; k++) {
                         File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
                                 nodePartitions[firstPartition].getPartitionId()) + File.separator
+ relPathFile);
-                        splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition],
f
-                                .getPath()));
+                        splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition],
+                                f.getPath()));
                     }
                 }
             }
@@ -138,9 +137,9 @@ public class SplitsAndConstraintsUtil {
     }
 
     public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getFilesIndexSplitProviderAndConstraints(
-            MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
String targetIdxName,
-            boolean create) throws AlgebricksException {
-        FileSplit[] splits = getFilesIndexSplits(mdTxnCtx, dataverseName, datasetName, targetIdxName,
create);
+            Dataset dataset, MetadataTransactionContext mdTxnCtx, String targetIdxName, boolean
create)
+            throws AlgebricksException {
+        FileSplit[] splits = getFilesIndexSplits(dataset, mdTxnCtx, targetIdxName, create);
         return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
     }
 


Mime
View raw message