asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [18/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master' into hyracks-merge2
Date Thu, 07 Apr 2016 14:59:53 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f667bd8,0000000..c5f6915
mode 100644,000000..100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@@ -1,762 -1,0 +1,771 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.common.context;
 +
 +import java.io.IOException;
 +import java.io.OutputStream;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.asterix.common.api.IDatasetLifecycleManager;
 +import org.apache.asterix.common.api.ILocalResourceMetadata;
 +import org.apache.asterix.common.config.AsterixStorageProperties;
 +import org.apache.asterix.common.exceptions.ACIDException;
 +import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 +import org.apache.asterix.common.transactions.ILogManager;
 +import org.apache.asterix.common.transactions.LogRecord;
 +import org.apache.asterix.common.utils.TransactionUtil;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 +import org.apache.hyracks.storage.am.common.api.IIndex;
 +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 +import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 +import org.apache.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
 +import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
 +import org.apache.hyracks.storage.common.buffercache.ResourceHeapBufferAllocator;
 +import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 +import org.apache.hyracks.storage.common.file.LocalResource;
 +
 +public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent
{
 +    private final AsterixStorageProperties storageProperties;
 +    private final Map<Integer, DatasetVirtualBufferCaches> datasetVirtualBufferCachesMap;
 +    private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
 +    private final Map<Integer, DatasetInfo> datasetInfos;
 +    private final ILocalResourceRepository resourceRepository;
 +    private final int firstAvilableUserDatasetID;
 +    private final long capacity;
 +    private long used;
 +    private final ILogManager logManager;
 +    private final LogRecord logRecord;
 +    private final int numPartitions;
 +
 +    public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
-                                    ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID,
-                                    ILogManager logManager, int numPartitions) {
++            ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID,
ILogManager logManager,
++            int numPartitions) {
 +        this.logManager = logManager;
 +        this.storageProperties = storageProperties;
 +        this.resourceRepository = resourceRepository;
 +        this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
 +        this.numPartitions = numPartitions;
 +        datasetVirtualBufferCachesMap = new HashMap<>();
 +        datasetOpTrackers = new HashMap<Integer, ILSMOperationTracker>();
 +        datasetInfos = new HashMap<Integer, DatasetInfo>();
 +        capacity = storageProperties.getMemoryComponentGlobalBudget();
 +        used = 0;
 +        logRecord = new LogRecord();
 +    }
 +
 +    @Override
 +    public synchronized IIndex getIndex(String resourcePath) throws HyracksDataException
{
 +        int datasetID = getDIDfromResourcePath(resourcePath);
 +        long resourceID = getResourceIDfromResourcePath(resourcePath);
 +        return getIndex(datasetID, resourceID);
 +    }
 +
 +    @Override
 +    public synchronized IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException
{
 +        DatasetInfo dsInfo = datasetInfos.get(datasetID);
 +        if (dsInfo == null) {
 +            return null;
 +        }
 +        IndexInfo iInfo = dsInfo.indexes.get(resourceID);
 +        if (iInfo == null) {
 +            return null;
 +        }
 +        return iInfo.index;
 +    }
 +
 +    @Override
 +    public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException
{
 +        int did = getDIDfromResourcePath(resourcePath);
 +        long resourceID = getResourceIDfromResourcePath(resourcePath);
 +        DatasetInfo dsInfo = datasetInfos.get(did);
 +        if (dsInfo == null) {
 +            dsInfo = getDatasetInfo(did);
 +        }
 +        if (!dsInfo.isRegistered) {
 +            dsInfo.isExternal = !index.hasMemoryComponents();
 +            dsInfo.isRegistered = true;
++            dsInfo.durable = ((ILSMIndex) index).isDurable();
 +        }
 +
 +        if (dsInfo.indexes.containsKey(resourceID)) {
 +            throw new HyracksDataException("Index with resource ID " + resourceID + " already
exists.");
 +        }
 +        dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index, dsInfo.datasetID,
resourceID));
 +    }
 +
 +    public int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
 +        LocalResource lr = resourceRepository.getResourceByPath(resourcePath);
 +        if (lr == null) {
 +            return -1;
 +        }
 +        return ((ILocalResourceMetadata) lr.getResourceObject()).getDatasetID();
 +    }
 +
 +    public long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException
{
 +        LocalResource lr = resourceRepository.getResourceByPath(resourcePath);
 +        if (lr == null) {
 +            return -1;
 +        }
 +        return lr.getResourceId();
 +    }
 +
 +    @Override
 +    public synchronized void unregister(String resourcePath) throws HyracksDataException
{
 +        int did = getDIDfromResourcePath(resourcePath);
 +        long resourceID = getResourceIDfromResourcePath(resourcePath);
 +
 +        DatasetInfo dsInfo = datasetInfos.get(did);
 +        IndexInfo iInfo = dsInfo == null ? null : dsInfo.indexes.get(resourceID);
 +
 +        if (dsInfo == null || iInfo == null) {
 +            throw new HyracksDataException("Index with resource ID " + resourceID + " does
not exist.");
 +        }
 +
 +        PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers.get(dsInfo.datasetID);
 +        if (iInfo.referenceCount != 0 || (opTracker != null && opTracker.getNumActiveOperations()
!= 0)) {
 +            throw new HyracksDataException("Cannot remove index while it is open. (Dataset
reference count = "
 +                    + iInfo.referenceCount + ", Operation tracker number of active operations
= "
 +                    + opTracker.getNumActiveOperations() + ")");
 +        }
 +
 +        // TODO: use fine-grained counters, one for each index instead of a single counter
per dataset.
 +        // First wait for any ongoing IO operations
 +        synchronized (dsInfo) {
 +            while (dsInfo.numActiveIOOps > 0) {
 +                try {
 +                    //notification will come from DatasetInfo class (undeclareActiveIOOperation)
 +                    dsInfo.wait();
 +                } catch (InterruptedException e) {
 +                    throw new HyracksDataException(e);
 +                }
 +            }
 +        }
 +
 +        // Flush and wait for it to finish, it is separated from the above wait so they
don't deadlock each other.
 +        // TODO: Find a better way to do this.
 +        flushAndWaitForIO(dsInfo, iInfo);
 +
 +        if (iInfo.isOpen) {
 +            ILSMOperationTracker indexOpTracker = iInfo.index.getOperationTracker();
 +            synchronized (indexOpTracker) {
 +                iInfo.index.deactivate(false);
 +            }
 +        }
 +
 +        dsInfo.indexes.remove(resourceID);
 +        if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty()
&& !dsInfo.isExternal) {
 +            removeDatasetFromCache(dsInfo.datasetID);
 +        }
 +    }
 +
 +    @Override
 +    public synchronized void open(String resourcePath) throws HyracksDataException {
 +        int did = getDIDfromResourcePath(resourcePath);
 +        long resourceID = getResourceIDfromResourcePath(resourcePath);
 +
 +        DatasetInfo dsInfo = datasetInfos.get(did);
 +        if (dsInfo == null || !dsInfo.isRegistered) {
 +            throw new HyracksDataException(
 +                    "Failed to open index with resource ID " + resourceID + " since it does
not exist.");
 +        }
 +
 +        IndexInfo iInfo = dsInfo.indexes.get(resourceID);
 +        if (iInfo == null) {
 +            throw new HyracksDataException(
 +                    "Failed to open index with resource ID " + resourceID + " since it does
not exist.");
 +        }
 +        if (!dsInfo.isOpen && !dsInfo.isExternal) {
 +            initializeDatasetVirtualBufferCache(did);
 +        }
 +
 +        dsInfo.isOpen = true;
 +        dsInfo.touch();
 +        if (!iInfo.isOpen) {
 +            ILSMOperationTracker opTracker = iInfo.index.getOperationTracker();
 +            synchronized (opTracker) {
 +                iInfo.index.activate();
 +            }
 +            iInfo.isOpen = true;
 +        }
 +        iInfo.touch();
 +    }
 +
 +    private boolean evictCandidateDataset() throws HyracksDataException {
 +        /**
 +         * We will take a dataset that has no active transactions, it is open (a dataset
consuming memory),
 +         * that is not being used (refcount == 0) and has been least recently used, excluding
metadata datasets.
 +         * The sort order defined for DatasetInfo maintains this. See DatasetInfo.compareTo().
 +         */
 +        List<DatasetInfo> datasetInfosList = new ArrayList<DatasetInfo>(datasetInfos.values());
 +        Collections.sort(datasetInfosList);
 +        for (DatasetInfo dsInfo : datasetInfosList) {
 +            PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers
 +                    .get(dsInfo.datasetID);
 +            if (opTracker != null && opTracker.getNumActiveOperations() == 0 &&
dsInfo.referenceCount == 0
 +                    && dsInfo.isOpen && dsInfo.datasetID >= firstAvilableUserDatasetID)
{
 +                closeDataset(dsInfo);
 +                return true;
 +            }
 +        }
 +        return false;
 +    }
 +
 +    private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException
{
 +        if (iInfo.isOpen) {
 +            ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
 +                    NoOpOperationCallback.INSTANCE);
 +            accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
 +        }
 +
 +        // Wait for the above flush op.
 +        synchronized (dsInfo) {
 +            while (dsInfo.numActiveIOOps > 0) {
 +                try {
 +                    //notification will come from DatasetInfo class (undeclareActiveIOOperation)
 +                    dsInfo.wait();
 +                } catch (InterruptedException e) {
 +                    throw new HyracksDataException(e);
 +                }
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public DatasetInfo getDatasetInfo(int datasetID) {
 +        synchronized (datasetInfos) {
 +            DatasetInfo dsInfo = datasetInfos.get(datasetID);
 +            if (dsInfo == null) {
 +                dsInfo = new DatasetInfo(datasetID);
 +                datasetInfos.put(datasetID, dsInfo);
 +            }
 +            return dsInfo;
 +        }
 +    }
 +
 +    @Override
 +    public synchronized void close(String resourcePath) throws HyracksDataException {
 +        int did = getDIDfromResourcePath(resourcePath);
 +        long resourceID = getResourceIDfromResourcePath(resourcePath);
 +
 +        DatasetInfo dsInfo = datasetInfos.get(did);
 +        if (dsInfo == null) {
 +            throw new HyracksDataException("No index found with resourceID " + resourceID);
 +        }
 +        IndexInfo iInfo = dsInfo.indexes.get(resourceID);
 +        if (iInfo == null) {
 +            throw new HyracksDataException("No index found with resourceID " + resourceID);
 +        }
 +        iInfo.untouch();
 +        dsInfo.untouch();
 +    }
 +
 +    @Override
 +    public synchronized List<IIndex> getOpenIndexes() {
 +        List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
 +        List<IIndex> openIndexes = new ArrayList<IIndex>();
 +        for (IndexInfo iInfo : openIndexesInfo) {
 +            openIndexes.add(iInfo.index);
 +        }
 +        return openIndexes;
 +    }
 +
 +    @Override
 +    public synchronized List<IndexInfo> getOpenIndexesInfo() {
 +        List<IndexInfo> openIndexesInfo = new ArrayList<IndexInfo>();
 +        for (DatasetInfo dsInfo : datasetInfos.values()) {
 +            for (IndexInfo iInfo : dsInfo.indexes.values()) {
 +                if (iInfo.isOpen) {
 +                    openIndexesInfo.add(iInfo);
 +                }
 +            }
 +        }
 +        return openIndexesInfo;
 +    }
 +
 +    private DatasetVirtualBufferCaches getVirtualBufferCaches(int datasetID) {
 +        synchronized (datasetVirtualBufferCachesMap) {
 +            DatasetVirtualBufferCaches vbcs = datasetVirtualBufferCachesMap.get(datasetID);
 +            if (vbcs == null) {
 +                vbcs = initializeDatasetVirtualBufferCache(datasetID);
 +            }
 +            return vbcs;
 +        }
 +    }
 +
 +    @Override
 +    public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int ioDeviceNum)
{
 +        DatasetVirtualBufferCaches dvbcs = getVirtualBufferCaches(datasetID);
 +        return dvbcs.getVirtualBufferCaches(ioDeviceNum);
 +    }
 +
 +    private void removeDatasetFromCache(int datasetID) throws HyracksDataException {
 +        deallocateDatasetMemory(datasetID);
 +        datasetInfos.remove(datasetID);
 +        datasetVirtualBufferCachesMap.remove(datasetID);
 +        datasetOpTrackers.remove(datasetID);
 +    }
 +
 +    private DatasetVirtualBufferCaches initializeDatasetVirtualBufferCache(int datasetID)
{
 +        synchronized (datasetVirtualBufferCachesMap) {
 +            DatasetVirtualBufferCaches dvbcs = new DatasetVirtualBufferCaches(datasetID);
 +            datasetVirtualBufferCachesMap.put(datasetID, dvbcs);
 +            return dvbcs;
 +        }
 +    }
++
 +    @Override
 +    public ILSMOperationTracker getOperationTracker(int datasetID) {
 +        synchronized (datasetOpTrackers) {
 +            ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
 +            if (opTracker == null) {
 +                opTracker = new PrimaryIndexOperationTracker(datasetID, logManager, getDatasetInfo(datasetID));
 +                datasetOpTrackers.put(datasetID, opTracker);
 +            }
 +            return opTracker;
 +        }
 +    }
 +
 +    private static abstract class Info {
 +        protected int referenceCount;
 +        protected boolean isOpen;
 +
 +        public Info() {
 +            referenceCount = 0;
 +            isOpen = false;
 +        }
 +
 +        public void touch() {
 +            ++referenceCount;
 +        }
 +
 +        public void untouch() {
 +            --referenceCount;
 +        }
 +    }
 +
 +    public static class IndexInfo extends Info {
 +        private final ILSMIndex index;
 +        private final long resourceId;
 +        private final int datasetId;
 +
 +        public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
 +            this.index = index;
 +            this.datasetId = datasetId;
 +            this.resourceId = resourceId;
 +        }
 +
 +        public ILSMIndex getIndex() {
 +            return index;
 +        }
 +
 +        public long getResourceId() {
 +            return resourceId;
 +        }
 +
 +        public int getDatasetId() {
 +            return datasetId;
 +        }
 +    }
 +
 +    public static class DatasetInfo extends Info implements Comparable<DatasetInfo>
{
 +        private final Map<Long, IndexInfo> indexes;
 +        private final int datasetID;
 +        private long lastAccess;
 +        private int numActiveIOOps;
 +        private boolean isExternal;
 +        private boolean isRegistered;
 +        private boolean memoryAllocated;
++        private boolean durable;
 +
 +        public DatasetInfo(int datasetID) {
 +            this.indexes = new HashMap<Long, IndexInfo>();
 +            this.lastAccess = -1;
 +            this.datasetID = datasetID;
 +            this.isRegistered = false;
 +            this.memoryAllocated = false;
 +        }
 +
 +        @Override
 +        public void touch() {
 +            super.touch();
 +            lastAccess = System.currentTimeMillis();
 +        }
 +
 +        @Override
 +        public void untouch() {
 +            super.untouch();
 +            lastAccess = System.currentTimeMillis();
 +        }
 +
 +        public synchronized void declareActiveIOOperation() {
 +            numActiveIOOps++;
 +        }
 +
 +        public synchronized void undeclareActiveIOOperation() {
 +            numActiveIOOps--;
 +            //notify threads waiting on this dataset info
 +            notifyAll();
 +        }
 +
 +        public synchronized Set<ILSMIndex> getDatasetIndexes() {
 +            Set<ILSMIndex> datasetIndexes = new HashSet<ILSMIndex>();
 +            for (IndexInfo iInfo : indexes.values()) {
 +                if (iInfo.isOpen) {
 +                    datasetIndexes.add(iInfo.index);
 +                }
 +            }
 +
 +            return datasetIndexes;
 +        }
 +
 +        @Override
 +        public int compareTo(DatasetInfo i) {
 +            // sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
 +            //
 +            // Example sort order:
 +            // -------------------
 +            // (F, 0, 70)       <-- largest
 +            // (F, 0, 60)
 +            // (T, 10, 80)
 +            // (T, 10, 70)
 +            // (T, 9, 90)
 +            // (T, 0, 100)      <-- smallest
 +            if (isOpen && !i.isOpen) {
 +                return -1;
 +            } else if (!isOpen && i.isOpen) {
 +                return 1;
 +            } else {
 +                if (referenceCount < i.referenceCount) {
 +                    return -1;
 +                } else if (referenceCount > i.referenceCount) {
 +                    return 1;
 +                } else {
 +                    if (lastAccess < i.lastAccess) {
 +                        return -1;
 +                    } else if (lastAccess > i.lastAccess) {
 +                        return 1;
 +                    } else {
 +                        return 0;
 +                    }
 +                }
 +            }
 +
 +        }
 +
 +        @Override
 +        public String toString() {
 +            return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " +
referenceCount
 +                    + ", lastAccess: " + lastAccess + ", isRegistered: " + isRegistered
+ ", memoryAllocated: "
-                     + memoryAllocated;
++                    + memoryAllocated + ", isDurable: " + durable;
++        }
++
++        public boolean isDurable() {
++            return durable;
 +        }
 +    }
 +
 +    @Override
 +    public synchronized void start() {
 +        used = 0;
 +    }
 +
 +    @Override
 +    public synchronized void flushAllDatasets() throws HyracksDataException {
 +        for (DatasetInfo dsInfo : datasetInfos.values()) {
 +            flushDatasetOpenIndexes(dsInfo, false);
 +        }
 +    }
 +
 +    @Override
 +    public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException
{
 +        DatasetInfo datasetInfo = datasetInfos.get(datasetId);
 +        if (datasetInfo != null) {
 +            flushDatasetOpenIndexes(datasetInfo, asyncFlush);
 +        }
 +    }
 +
 +    @Override
 +    public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws
HyracksDataException {
 +        //schedule flush for datasets with min LSN (Log Serial Number) < targetLSN
 +        for (DatasetInfo dsInfo : datasetInfos.values()) {
 +            PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) getOperationTracker(
 +                    dsInfo.datasetID);
 +            synchronized (opTracker) {
 +                for (IndexInfo iInfo : dsInfo.indexes.values()) {
 +                    AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback)
iInfo.index
 +                            .getIOOperationCallback();
 +                    if (!(((AbstractLSMIndex) iInfo.index).isCurrentMutableComponentEmpty()
 +                            || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated()
 +                            || opTracker.isFlushOnExit())) {
 +                        long firstLSN = ioCallback.getFirstLSN();
 +                        if (firstLSN < targetLSN) {
 +                            opTracker.setFlushOnExit(true);
 +                            if (opTracker.getNumActiveOperations() == 0) {
 +                                // No Modify operations currently, we need to trigger the
flush and we can do so safely
 +                                opTracker.flushIfRequested();
 +                            }
 +                            break;
 +                        }
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    /*
 +     * This method can only be called asynchronously safely if we're sure no modify operation
will take place until the flush is scheduled
 +     */
 +    private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws
HyracksDataException {
-         if (!dsInfo.isExternal) {
++        if (!dsInfo.isExternal && dsInfo.durable) {
 +            synchronized (logRecord) {
 +                TransactionUtil.formFlushLogRecord(logRecord, dsInfo.datasetID, null, logManager.getNodeId(),
 +                        dsInfo.indexes.size());
 +                try {
 +                    logManager.log(logRecord);
 +                } catch (ACIDException e) {
 +                    throw new HyracksDataException("could not write flush log while closing
dataset", e);
 +                }
 +
 +                try {
 +                    //notification will come from LogPage class (notifyFlushTerminator)
 +                    logRecord.wait();
 +                } catch (InterruptedException e) {
 +                    throw new HyracksDataException(e);
 +                }
 +            }
 +            for (IndexInfo iInfo : dsInfo.indexes.values()) {
 +                //update resource lsn
 +                AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback)
iInfo.index
 +                        .getIOOperationCallback();
 +                ioOpCallback.updateLastLSN(logRecord.getLSN());
 +            }
 +        }
 +
 +        if (asyncFlush) {
 +            for (IndexInfo iInfo : dsInfo.indexes.values()) {
 +                ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
 +                        NoOpOperationCallback.INSTANCE);
 +                accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
 +            }
 +        } else {
 +            for (IndexInfo iInfo : dsInfo.indexes.values()) {
 +                // TODO: This is not efficient since we flush the indexes sequentially.
 +                // Think of a way to allow submitting the flush requests concurrently. We
don't do them concurrently because this
 +                // may lead to a deadlock scenario between the DatasetLifeCycleManager and
the PrimaryIndexOperationTracker.
 +                flushAndWaitForIO(dsInfo, iInfo);
 +            }
 +        }
 +    }
 +
 +    private void closeDataset(DatasetInfo dsInfo) throws HyracksDataException {
 +        // First wait for any ongoing IO operations
 +        synchronized (dsInfo) {
 +            while (dsInfo.numActiveIOOps > 0) {
 +                try {
 +                    dsInfo.wait();
 +                } catch (InterruptedException e) {
 +                    throw new HyracksDataException(e);
 +                }
 +            }
 +        }
 +        try {
 +            flushDatasetOpenIndexes(dsInfo, false);
 +        } catch (Exception e) {
 +            throw new HyracksDataException(e);
 +        }
 +        for (IndexInfo iInfo : dsInfo.indexes.values()) {
 +            if (iInfo.isOpen) {
 +                ILSMOperationTracker opTracker = iInfo.index.getOperationTracker();
 +                synchronized (opTracker) {
 +                    iInfo.index.deactivate(false);
 +                }
 +                iInfo.isOpen = false;
 +            }
 +            assert iInfo.referenceCount == 0;
 +        }
 +        removeDatasetFromCache(dsInfo.datasetID);
 +        dsInfo.isOpen = false;
 +    }
 +
 +    @Override
 +    public synchronized void closeAllDatasets() throws HyracksDataException {
 +        List<DatasetInfo> openDatasets = new ArrayList<>(datasetInfos.values());
 +        for (DatasetInfo dsInfo : openDatasets) {
 +            closeDataset(dsInfo);
 +        }
 +    }
 +
 +    @Override
 +    public synchronized void closeUserDatasets() throws HyracksDataException {
 +        List<DatasetInfo> openDatasets = new ArrayList<>(datasetInfos.values());
 +        for (DatasetInfo dsInfo : openDatasets) {
 +            if (dsInfo.datasetID >= firstAvilableUserDatasetID) {
 +                closeDataset(dsInfo);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public synchronized void stop(boolean dumpState, OutputStream outputStream) throws IOException
{
 +        if (dumpState) {
 +            dumpState(outputStream);
 +        }
 +
 +        closeAllDatasets();
 +
 +        datasetVirtualBufferCachesMap.clear();
 +        datasetOpTrackers.clear();
 +        datasetInfos.clear();
 +    }
 +
 +    @Override
 +    public void dumpState(OutputStream outputStream) throws IOException {
 +        StringBuilder sb = new StringBuilder();
 +
 +        sb.append(String.format("Memory budget = %d\n", capacity));
 +        sb.append(String.format("Memory used = %d\n", used));
 +        sb.append("\n");
 +
 +        String dsHeaderFormat = "%-10s %-6s %-16s %-12s\n";
 +        String dsFormat = "%-10d %-6b %-16d %-12d\n";
 +        String idxHeaderFormat = "%-10s %-11s %-6s %-16s %-6s\n";
 +        String idxFormat = "%-10d %-11d %-6b %-16d %-6s\n";
 +
 +        sb.append("[Datasets]\n");
 +        sb.append(String.format(dsHeaderFormat, "DatasetID", "Open", "Reference Count",
"Last Access"));
 +        for (DatasetInfo dsInfo : datasetInfos.values()) {
 +            sb.append(
 +                    String.format(dsFormat, dsInfo.datasetID, dsInfo.isOpen, dsInfo.referenceCount,
dsInfo.lastAccess));
 +        }
 +        sb.append("\n");
 +
 +        sb.append("[Indexes]\n");
 +        sb.append(String.format(idxHeaderFormat, "DatasetID", "ResourceID", "Open", "Reference
Count", "Index"));
 +        for (DatasetInfo dsInfo : datasetInfos.values()) {
 +            for (Map.Entry<Long, IndexInfo> entry : dsInfo.indexes.entrySet()) {
 +                IndexInfo iInfo = entry.getValue();
 +                sb.append(String.format(idxFormat, dsInfo.datasetID, entry.getKey(), iInfo.isOpen,
iInfo.referenceCount,
 +                        iInfo.index));
 +            }
 +        }
 +
 +        outputStream.write(sb.toString().getBytes());
 +    }
 +
 +    private synchronized void allocateDatasetMemory(int datasetId) throws HyracksDataException
{
 +        DatasetInfo dsInfo = datasetInfos.get(datasetId);
 +        if (dsInfo == null) {
 +            throw new HyracksDataException(
 +                    "Failed to allocate memory for dataset with ID " + datasetId + " since
it is not open.");
 +        }
 +        synchronized (dsInfo) {
 +            // This is not needed for external datasets' indexes since they never use the
virtual buffer cache.
 +            if (!dsInfo.memoryAllocated && !dsInfo.isExternal) {
 +                long additionalSize = getVirtualBufferCaches(dsInfo.datasetID).getTotalSize();
 +                while (used + additionalSize > capacity) {
 +                    if (!evictCandidateDataset()) {
 +                        throw new HyracksDataException("Cannot allocate dataset " + dsInfo.datasetID
 +                                + " memory since memory budget would be exceeded.");
 +                    }
 +                }
 +                used += additionalSize;
 +                dsInfo.memoryAllocated = true;
 +            }
 +        }
 +    }
 +
 +    private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException
{
 +        DatasetInfo dsInfo = datasetInfos.get(datasetId);
 +        if (dsInfo == null) {
 +            throw new HyracksDataException(
 +                    "Failed to deallocate memory for dataset with ID " + datasetId + " since
it is not open.");
 +        }
 +        synchronized (dsInfo) {
 +            if (dsInfo.isOpen && dsInfo.memoryAllocated) {
 +                used -= getVirtualBufferCaches(dsInfo.datasetID).getTotalSize();
 +                dsInfo.memoryAllocated = false;
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public synchronized void allocateMemory(String resourcePath) throws HyracksDataException
{
 +        //a resource name in the case of DatasetLifecycleManager is a dataset id which is
passed to the ResourceHeapBufferAllocator.
 +        int did = Integer.parseInt(resourcePath);
 +        allocateDatasetMemory(did);
 +    }
 +
 +    private class DatasetVirtualBufferCaches {
 +        private final int datasetID;
 +        private final Map<Integer, List<IVirtualBufferCache>> ioDeviceVirtualBufferCaches
= new HashMap<>();
 +
 +        public DatasetVirtualBufferCaches(int datasetID) {
 +            this.datasetID = datasetID;
 +        }
 +
 +        private List<IVirtualBufferCache> initializeVirtualBufferCaches(int ioDeviceNum)
{
 +            assert ioDeviceVirtualBufferCaches.size() < numPartitions;
 +            int numPages = datasetID < firstAvilableUserDatasetID
 +                    ? storageProperties.getMetadataMemoryComponentNumPages()
 +                    : storageProperties.getMemoryComponentNumPages();
 +            List<IVirtualBufferCache> vbcs = new ArrayList<>();
 +            for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
 +                MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
-                         new VirtualBufferCache(new ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
-                                 Integer.toString(datasetID)), storageProperties.getMemoryComponentPageSize(),
++                        new VirtualBufferCache(
++                                new ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
++                                        Integer.toString(datasetID)),
++                                storageProperties.getMemoryComponentPageSize(),
 +                                numPages / storageProperties.getMemoryComponentsNum() /
numPartitions));
 +                vbcs.add(vbc);
 +            }
 +            ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
 +            return vbcs;
 +        }
 +
 +        public List<IVirtualBufferCache> getVirtualBufferCaches(int ioDeviceNum) {
 +            synchronized (ioDeviceVirtualBufferCaches) {
 +                List<IVirtualBufferCache> vbcs = ioDeviceVirtualBufferCaches.get(ioDeviceNum);
 +                if (vbcs == null) {
 +                    vbcs = initializeVirtualBufferCaches(ioDeviceNum);
 +                }
 +                return vbcs;
 +            }
 +        }
 +
 +        public long getTotalSize() {
 +            int numPages = datasetID < firstAvilableUserDatasetID
 +                    ? storageProperties.getMetadataMemoryComponentNumPages()
 +                    : storageProperties.getMemoryComponentNumPages();
 +
 +            return storageProperties.getMemoryComponentPageSize() * numPages;
 +        }
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index e5a3473,0000000..b3eb281
mode 100644,000000..100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@@ -1,201 -1,0 +1,209 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.asterix.common.context;
 +
 +import java.util.Set;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
 +import org.apache.asterix.common.exceptions.ACIDException;
 +import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 +import org.apache.asterix.common.transactions.AbstractOperationCallback;
 +import org.apache.asterix.common.transactions.ILogManager;
 +import org.apache.asterix.common.transactions.LogRecord;
 +import org.apache.asterix.common.utils.TransactionUtil;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 +
 +public class PrimaryIndexOperationTracker extends BaseOperationTracker {
 +
 +    // Number of active operations on an ILSMIndex instance.
 +    private final AtomicInteger numActiveOperations;
 +    private final ILogManager logManager;
 +    private boolean flushOnExit = false;
 +    private boolean flushLogCreated = false;
 +
 +    public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo
dsInfo) {
 +        super(datasetID, dsInfo);
 +        this.logManager = logManager;
 +        this.numActiveOperations = new AtomicInteger();
 +    }
 +
 +    @Override
 +    public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback
searchCallback,
 +            IModificationOperationCallback modificationCallback) throws HyracksDataException
{
 +        if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)
{
 +            incrementNumActiveOperations(modificationCallback);
 +        } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
 +                || opType == LSMOperationType.REPLICATE) {
 +            dsInfo.declareActiveIOOperation();
 +        }
 +    }
 +
 +    @Override
 +    public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback
searchCallback,
 +            IModificationOperationCallback modificationCallback) throws HyracksDataException
{
 +        // Searches are immediately considered complete, because they should not prevent
the execution of flushes.
 +        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
 +                || opType == LSMOperationType.REPLICATE) {
 +            completeOperation(index, opType, searchCallback, modificationCallback);
 +        }
 +    }
 +
 +    @Override
 +    public synchronized void completeOperation(ILSMIndex index, LSMOperationType opType,
 +            ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback)
 +                    throws HyracksDataException {
 +        if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)
{
 +            decrementNumActiveOperations(modificationCallback);
 +            if (numActiveOperations.get() == 0) {
 +                flushIfRequested();
 +            } else if (numActiveOperations.get() < 0) {
 +                throw new HyracksDataException("The number of active operations cannot be
negative!");
 +            }
 +        } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
 +                || opType == LSMOperationType.REPLICATE) {
 +            dsInfo.undeclareActiveIOOperation();
 +        }
 +    }
 +
 +    public void flushIfRequested() throws HyracksDataException {
 +        // If we need a flush, and this is the last completing operation, then schedule
the flush,
 +        // or if there is a flush scheduled by the checkpoint (flushOnExit), then schedule
it
 +
 +        boolean needsFlush = false;
 +        Set<ILSMIndex> indexes = dsInfo.getDatasetIndexes();
 +
 +        if (!flushOnExit) {
 +            for (ILSMIndex lsmIndex : indexes) {
 +                ILSMIndexInternal lsmIndexInternal = (ILSMIndexInternal) lsmIndex;
 +                if (lsmIndexInternal.hasFlushRequestForCurrentMutableComponent()) {
 +                    needsFlush = true;
 +                    break;
 +                }
 +            }
 +        }
 +
 +        if (needsFlush || flushOnExit) {
 +            //Make the current mutable components READABLE_UNWRITABLE to stop coming modify
operations from entering them until the current flush is scheduled.
 +            for (ILSMIndex lsmIndex : indexes) {
 +                AbstractLSMIndex abstractLSMIndex = ((AbstractLSMIndex) lsmIndex);
 +                ILSMOperationTracker opTracker = abstractLSMIndex.getOperationTracker();
 +                synchronized (opTracker) {
 +                    if (abstractLSMIndex.getCurrentMutableComponentState() == ComponentState.READABLE_WRITABLE)
{
 +                        abstractLSMIndex.setCurrentMutableComponentState(ComponentState.READABLE_UNWRITABLE);
 +                    }
 +                }
 +            }
 +            LogRecord logRecord = new LogRecord();
-             TransactionUtil.formFlushLogRecord(logRecord, datasetID, this, logManager.getNodeId(),
-                     dsInfo.getDatasetIndexes().size());
-             try {
-                 logManager.log(logRecord);
-             } catch (ACIDException e) {
-                 throw new HyracksDataException("could not write flush log", e);
-             }
- 
-             flushLogCreated = true;
 +            flushOnExit = false;
++            if (dsInfo.isDurable()) {
++                /**
++                 * Generate a FLUSH log.
++                 * Flush will be triggered when the log is written to disk by LogFlusher.
++                 */
++                TransactionUtil.formFlushLogRecord(logRecord, datasetID, this, logManager.getNodeId(),
++                        dsInfo.getDatasetIndexes().size());
++                try {
++                    logManager.log(logRecord);
++                } catch (ACIDException e) {
++                    throw new HyracksDataException("could not write flush log", e);
++                }
++                flushLogCreated = true;
++            } else {
++                //trigger flush for temporary indexes without generating a FLUSH log.
++                triggerScheduleFlush(logRecord);
++            }
 +        }
 +    }
 +
 +    //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence
flushes were scheduled.
 +    public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException
{
 +        for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
 +
 +            //get resource
 +            ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
 +                    NoOpOperationCallback.INSTANCE);
 +
 +            //update resource lsn
 +            AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback)
lsmIndex
 +                    .getIOOperationCallback();
 +            ioOpCallback.updateLastLSN(logRecord.getLSN());
 +
 +            //schedule flush after update
 +            accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
 +        }
 +
 +        flushLogCreated = false;
 +    }
 +
 +    @Override
 +    public void exclusiveJobCommitted() throws HyracksDataException {
 +        numActiveOperations.set(0);
 +        flushIfRequested();
 +    }
 +
 +    public int getNumActiveOperations() {
 +        return numActiveOperations.get();
 +    }
 +
 +    private void incrementNumActiveOperations(IModificationOperationCallback modificationCallback)
{
 +        //modificationCallback can be NoOpOperationCallback when redo/undo operations are
executed.
 +        if (modificationCallback != NoOpOperationCallback.INSTANCE) {
 +            numActiveOperations.incrementAndGet();
 +            ((AbstractOperationCallback) modificationCallback).incrementLocalNumActiveOperations();
 +        }
 +    }
 +
 +    private void decrementNumActiveOperations(IModificationOperationCallback modificationCallback)
{
 +        //modificationCallback can be NoOpOperationCallback when redo/undo operations are
executed.
 +        if (modificationCallback != NoOpOperationCallback.INSTANCE) {
 +            numActiveOperations.decrementAndGet();
 +            ((AbstractOperationCallback) modificationCallback).decrementLocalNumActiveOperations();
 +        }
 +    }
 +
 +    public void cleanupNumActiveOperationsForAbortedJob(int numberOfActiveOperations) {
 +        numberOfActiveOperations *= -1;
 +        numActiveOperations.getAndAdd(numberOfActiveOperations);
 +    }
 +
 +    public boolean isFlushOnExit() {
 +        return flushOnExit;
 +    }
 +
 +    public void setFlushOnExit(boolean flushOnExit) {
 +        this.flushOnExit = flushOnExit;
 +    }
 +
 +    public boolean isFlushLogCreated() {
 +        return flushLogCreated;
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 5b4035c,0000000..78b06fb
mode 100644,000000..100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@@ -1,73 -1,0 +1,73 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.common.utils;
 +
 +import java.io.File;
 +
 +import org.apache.asterix.common.cluster.ClusterPartition;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 +import org.apache.hyracks.algebricks.common.utils.Pair;
 +import org.apache.hyracks.api.io.FileReference;
 +import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 +import org.apache.hyracks.dataflow.std.file.FileSplit;
 +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 +
 +public class StoragePathUtil {
 +    public static final String PARTITION_DIR_PREFIX = "partition_";
 +    public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
 +    public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
 +    public static final String ADAPTER_INSTANCE_PREFIX = "adapter_";
 +
 +    public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
 +            FileSplit[] splits) {
 +        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
 +        String[] loc = new String[splits.length];
 +        for (int p = 0; p < splits.length; p++) {
 +            loc[p] = splits[p].getNodeName();
 +        }
 +        AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
 +        return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider,
pc);
 +    }
 +
 +    public static FileSplit getFileSplitForClusterPartition(ClusterPartition partition,
File relativeFile) {
 +        return new FileSplit(partition.getActiveNodeId(), new FileReference(relativeFile),
partition.getIODeviceNum(),
 +                partition.getPartitionId());
 +    }
 +
 +    public static String prepareStoragePartitionPath(String storageDirName, int partitonId)
{
 +        return storageDirName + File.separator + StoragePathUtil.PARTITION_DIR_PREFIX +
partitonId;
 +    }
 +
 +    public static String prepareDataverseIndexName(String dataverseName, String datasetName,
String idxName) {
 +        return prepareDataverseIndexName(dataverseName, prepareFullIndexName(datasetName,
idxName));
 +    }
 +
 +    public static String prepareDataverseIndexName(String dataverseName, String fullIndexName)
{
 +        return dataverseName + File.separator + fullIndexName;
 +    }
 +
 +    private static String prepareFullIndexName(String datasetName, String idxName) {
 +        return (datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName);
 +    }
 +
-     public static int getPartitonNumFromName(String name) {
++    public static int getPartitionNumFromName(String name) {
 +        return Integer.parseInt(name.substring(PARTITION_DIR_PREFIX.length()));
 +    }
 +}


Mime
View raw message